6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata import uri
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode
20 from allmydata.immutable.downloader.status import DownloadStatus
22 class CiphertextFileNode:
23 def __init__(self, verifycap, storage_broker, secret_holder,
24 terminator, history, download_status=None):
25 assert isinstance(verifycap, uri.CHKFileVerifierURI)
26 self._verifycap = verifycap
27 self._storage_broker = storage_broker
28 self._secret_holder = secret_holder
29 if download_status is None:
30 ds = DownloadStatus(verifycap.storage_index, verifycap.size)
32 history.add_download(ds)
34 self._terminator = terminator
35 self._history = history
36 self._download_status = download_status
37 self._node = None # created lazily, on read()
39 def _maybe_create_download_node(self):
40 if self._node is None:
41 self._node = DownloadNode(self._verifycap, self._storage_broker,
44 self._history, self._download_status)
46 def read(self, consumer, offset=0, size=None, read_ev=None):
47 """I am the main entry point, from which FileNode.read() can get
48 data. I feed the consumer with the desired range of ciphertext. I
49 return a Deferred that fires (with the consumer) when the read is
51 self._maybe_create_download_node()
52 return self._node.read(consumer, offset, size, read_ev)
54 def get_segment(self, segnum):
55 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
56 Deferred that fires with (offset,data) when the desired segment is
57 available, and c is an object on which c.cancel() can be called to
58 disavow interest in the segment (after which 'd' will never fire).
60 You probably need to know the segment size before calling this,
61 unless you want the first few bytes of the file. If you ask for a
62 segment number which turns out to be too large, the Deferred will
63 errback with BadSegmentNumberError.
65 The Deferred fires with the offset of the first byte of the data
66 segment, so that you can call get_segment() before knowing the
67 segment size, and still know which data you received.
69 self._maybe_create_download_node()
70 return self._node.get_segment(segnum)
72 def get_segment_size(self):
73 # return a Deferred that fires with the file's real segment size
74 self._maybe_create_download_node()
75 return self._node.get_segsize()
77 def get_storage_index(self):
78 return self._verifycap.storage_index
79 def get_verify_cap(self):
80 return self._verifycap
82 return self._verifycap.size
84 def raise_error(self):
88 def check_and_repair(self, monitor, verify=False, add_lease=False):
89 verifycap = self._verifycap
90 storage_index = verifycap.storage_index
91 sb = self._storage_broker
92 servers = sb.get_all_servers()
93 sh = self._secret_holder
95 c = Checker(verifycap=verifycap, servers=servers,
96 verify=verify, add_lease=add_lease, secret_holder=sh,
99 def _maybe_repair(cr):
100 crr = CheckAndRepairResults(storage_index)
101 crr.pre_repair_results = cr
103 crr.post_repair_results = cr
104 return defer.succeed(crr)
106 crr.repair_attempted = True
107 crr.repair_successful = False # until proven successful
108 def _gather_repair_results(ur):
109 assert IUploadResults.providedBy(ur), ur
110 # clone the cr (check results) to form the basis of the
111 # prr (post-repair results)
112 prr = CheckResults(cr.uri, cr.storage_index)
113 prr.data = copy.deepcopy(cr.data)
115 sm = prr.data['sharemap']
116 assert isinstance(sm, DictOfSets), sm
117 sm.update(ur.sharemap)
118 servers_responding = set(prr.data['servers-responding'])
119 servers_responding.union(ur.sharemap.iterkeys())
120 prr.data['servers-responding'] = list(servers_responding)
121 prr.data['count-shares-good'] = len(sm)
122 prr.data['count-good-share-hosts'] = len(sm)
123 is_healthy = bool(len(sm) >= verifycap.total_shares)
124 is_recoverable = bool(len(sm) >= verifycap.needed_shares)
125 prr.set_healthy(is_healthy)
126 prr.set_recoverable(is_recoverable)
127 crr.repair_successful = is_healthy
128 prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
130 crr.post_repair_results = prr
132 def _repair_error(f):
133 # as with mutable repair, I'm not sure if I want to pass
134 # through a failure or not. TODO
135 crr.repair_successful = False
136 crr.repair_failure = f
138 r = Repairer(self, storage_broker=sb, secret_holder=sh,
141 d.addCallbacks(_gather_repair_results, _repair_error)
144 d.addCallback(_maybe_repair)
147 def check(self, monitor, verify=False, add_lease=False):
148 verifycap = self._verifycap
149 sb = self._storage_broker
150 servers = sb.get_all_servers()
151 sh = self._secret_holder
153 v = Checker(verifycap=verifycap, servers=servers,
154 verify=verify, add_lease=add_lease, secret_holder=sh,
159 class DecryptingConsumer:
160 """I sit between a CiphertextDownloader (which acts as a Producer) and
161 the real Consumer, decrypting everything that passes by. The real
162 Consumer sees the real Producer, but the Producer sees us instead of the
164 implements(IConsumer)
166 def __init__(self, consumer, readkey, offset, read_event):
167 self._consumer = consumer
168 self._read_event = read_event
169 # TODO: pycryptopp CTR-mode needs random-access operations: I want
170 # either a=AES(readkey, offset) or better yet both of:
171 # a=AES(readkey, offset=0)
172 # a.process(ciphertext, offset=xyz)
173 # For now, we fake it with the existing iv= argument.
174 offset_big = offset // 16
175 offset_small = offset % 16
176 iv = binascii.unhexlify("%032x" % offset_big)
177 self._decryptor = AES(readkey, iv=iv)
178 self._decryptor.process("\x00"*offset_small)
180 def registerProducer(self, producer, streaming):
181 # this passes through, so the real consumer can flow-control the real
182 # producer. Therefore we don't need to provide any IPushProducer
183 # methods. We implement all the IConsumer methods as pass-throughs,
184 # and only intercept write() to perform decryption.
185 self._consumer.registerProducer(producer, streaming)
186 def unregisterProducer(self):
187 self._consumer.unregisterProducer()
188 def write(self, ciphertext):
190 plaintext = self._decryptor.process(ciphertext)
191 elapsed = now() - started
192 self._read_event.update(0, elapsed, 0)
193 self._consumer.write(plaintext)
195 class ImmutableFileNode:
196 implements(IImmutableFileNode)
198 # I wrap a CiphertextFileNode with a decryption key
199 def __init__(self, filecap, storage_broker, secret_holder, terminator,
201 assert isinstance(filecap, uri.CHKFileURI)
202 verifycap = filecap.get_verify_cap()
203 ds = DownloadStatus(verifycap.storage_index, verifycap.size)
205 history.add_download(ds)
206 self._download_status = ds
207 self._cnode = CiphertextFileNode(verifycap, storage_broker,
208 secret_holder, terminator, history, ds)
209 assert isinstance(filecap, uri.CHKFileURI)
211 self._readkey = filecap.key
213 # TODO: I'm not sure about this.. what's the use case for node==node? If
214 # we keep it here, we should also put this on CiphertextFileNode
216 return self.u.__hash__()
217 def __eq__(self, other):
218 if isinstance(other, ImmutableFileNode):
219 return self.u.__eq__(other.u)
222 def __ne__(self, other):
223 if isinstance(other, ImmutableFileNode):
224 return self.u.__eq__(other.u)
228 def read(self, consumer, offset=0, size=None):
230 if actual_size == None:
231 actual_size = self.u.size
232 actual_size = actual_size - offset
233 read_ev = self._download_status.add_read_event(offset,actual_size,
235 decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev)
236 d = self._cnode.read(decryptor, offset, size, read_ev)
237 d.addCallback(lambda dc: consumer)
240 def raise_error(self):
243 def get_write_uri(self):
246 def get_readonly_uri(self):
247 return self.get_uri()
250 return self.u.to_string()
253 def get_readcap(self):
254 return self.u.get_readonly()
255 def get_verify_cap(self):
256 return self.u.get_verify_cap()
257 def get_repair_cap(self):
258 # CHK files can be repaired with just the verifycap
259 return self.u.get_verify_cap()
261 def get_storage_index(self):
262 return self.u.get_storage_index()
265 return self.u.get_size()
266 def get_current_size(self):
267 return defer.succeed(self.get_size())
269 def is_mutable(self):
272 def is_readonly(self):
275 def is_unknown(self):
278 def is_allowed_in_immutable_directory(self):
281 def check_and_repair(self, monitor, verify=False, add_lease=False):
282 return self._cnode.check_and_repair(monitor, verify, add_lease)
283 def check(self, monitor, verify=False, add_lease=False):
284 return self._cnode.check(monitor, verify, add_lease)