6 from zope.interface import implements, Interface
7 from twisted.internet import defer
8 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata import uri
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode
20 from allmydata.immutable.downloader.status import DownloadStatus
22 class IDownloadStatusHandlingConsumer(Interface):
23 def set_download_status_read_event(read_ev):
24 """Record the DownloadStatus 'read event', to be updated with the
25 time it takes to decrypt each chunk of data."""
27 class CiphertextFileNode:
28 def __init__(self, verifycap, storage_broker, secret_holder,
30 assert isinstance(verifycap, uri.CHKFileVerifierURI)
31 self._verifycap = verifycap
32 self._storage_broker = storage_broker
33 self._secret_holder = secret_holder
34 self._terminator = terminator
35 self._history = history
36 self._download_status = None
37 self._node = None # created lazily, on read()
39 def _maybe_create_download_node(self):
40 if not self._download_status:
41 ds = DownloadStatus(self._verifycap.storage_index,
44 self._history.add_download(ds)
45 self._download_status = ds
46 if self._node is None:
47 self._node = DownloadNode(self._verifycap, self._storage_broker,
50 self._history, self._download_status)
52 def read(self, consumer, offset=0, size=None):
53 """I am the main entry point, from which FileNode.read() can get
54 data. I feed the consumer with the desired range of ciphertext. I
55 return a Deferred that fires (with the consumer) when the read is
57 self._maybe_create_download_node()
59 if actual_size is None:
60 actual_size = self._verifycap.size - offset
61 read_ev = self._download_status.add_read_event(offset, actual_size,
63 if IDownloadStatusHandlingConsumer.providedBy(consumer):
64 consumer.set_download_status_read_event(read_ev)
65 return self._node.read(consumer, offset, size, read_ev)
67 def get_segment(self, segnum):
68 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
69 Deferred that fires with (offset,data) when the desired segment is
70 available, and c is an object on which c.cancel() can be called to
71 disavow interest in the segment (after which 'd' will never fire).
73 You probably need to know the segment size before calling this,
74 unless you want the first few bytes of the file. If you ask for a
75 segment number which turns out to be too large, the Deferred will
76 errback with BadSegmentNumberError.
78 The Deferred fires with the offset of the first byte of the data
79 segment, so that you can call get_segment() before knowing the
80 segment size, and still know which data you received.
82 self._maybe_create_download_node()
83 return self._node.get_segment(segnum)
85 def get_segment_size(self):
86 # return a Deferred that fires with the file's real segment size
87 self._maybe_create_download_node()
88 return self._node.get_segsize()
90 def get_storage_index(self):
91 return self._verifycap.storage_index
92 def get_verify_cap(self):
93 return self._verifycap
95 return self._verifycap.size
97 def raise_error(self):
101 def check_and_repair(self, monitor, verify=False, add_lease=False):
102 verifycap = self._verifycap
103 storage_index = verifycap.storage_index
104 sb = self._storage_broker
105 servers = sb.get_connected_servers()
106 sh = self._secret_holder
108 c = Checker(verifycap=verifycap, servers=servers,
109 verify=verify, add_lease=add_lease, secret_holder=sh,
112 def _maybe_repair(cr):
113 crr = CheckAndRepairResults(storage_index)
114 crr.pre_repair_results = cr
116 crr.post_repair_results = cr
117 return defer.succeed(crr)
119 crr.repair_attempted = True
120 crr.repair_successful = False # until proven successful
121 def _gather_repair_results(ur):
122 assert IUploadResults.providedBy(ur), ur
123 # clone the cr (check results) to form the basis of the
124 # prr (post-repair results)
125 prr = CheckResults(cr.uri, cr.storage_index)
126 prr.data = copy.deepcopy(cr.data)
128 sm = prr.data['sharemap']
129 assert isinstance(sm, DictOfSets), sm
130 sm.update(ur.sharemap)
131 servers_responding = set(prr.data['servers-responding'])
132 servers_responding.union(ur.sharemap.iterkeys())
133 prr.data['servers-responding'] = list(servers_responding)
134 prr.data['count-shares-good'] = len(sm)
135 prr.data['count-good-share-hosts'] = len(sm)
136 is_healthy = bool(len(sm) >= verifycap.total_shares)
137 is_recoverable = bool(len(sm) >= verifycap.needed_shares)
138 prr.set_healthy(is_healthy)
139 prr.set_recoverable(is_recoverable)
140 crr.repair_successful = is_healthy
141 prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
143 crr.post_repair_results = prr
145 def _repair_error(f):
146 # as with mutable repair, I'm not sure if I want to pass
147 # through a failure or not. TODO
148 crr.repair_successful = False
149 crr.repair_failure = f
151 r = Repairer(self, storage_broker=sb, secret_holder=sh,
154 d.addCallbacks(_gather_repair_results, _repair_error)
157 d.addCallback(_maybe_repair)
160 def check(self, monitor, verify=False, add_lease=False):
161 verifycap = self._verifycap
162 sb = self._storage_broker
163 servers = sb.get_connected_servers()
164 sh = self._secret_holder
166 v = Checker(verifycap=verifycap, servers=servers,
167 verify=verify, add_lease=add_lease, secret_holder=sh,
171 class DecryptingConsumer:
172 """I sit between a CiphertextDownloader (which acts as a Producer) and
173 the real Consumer, decrypting everything that passes by. The real
174 Consumer sees the real Producer, but the Producer sees us instead of the
176 implements(IConsumer, IDownloadStatusHandlingConsumer)
178 def __init__(self, consumer, readkey, offset):
179 self._consumer = consumer
180 self._read_event = None
181 # TODO: pycryptopp CTR-mode needs random-access operations: I want
182 # either a=AES(readkey, offset) or better yet both of:
183 # a=AES(readkey, offset=0)
184 # a.process(ciphertext, offset=xyz)
185 # For now, we fake it with the existing iv= argument.
186 offset_big = offset // 16
187 offset_small = offset % 16
188 iv = binascii.unhexlify("%032x" % offset_big)
189 self._decryptor = AES(readkey, iv=iv)
190 self._decryptor.process("\x00"*offset_small)
192 def set_download_status_read_event(self, read_ev):
193 self._read_event = read_ev
195 def registerProducer(self, producer, streaming):
196 # this passes through, so the real consumer can flow-control the real
197 # producer. Therefore we don't need to provide any IPushProducer
198 # methods. We implement all the IConsumer methods as pass-throughs,
199 # and only intercept write() to perform decryption.
200 self._consumer.registerProducer(producer, streaming)
201 def unregisterProducer(self):
202 self._consumer.unregisterProducer()
203 def write(self, ciphertext):
205 plaintext = self._decryptor.process(ciphertext)
207 elapsed = now() - started
208 self._read_event.update(0, elapsed, 0)
209 self._consumer.write(plaintext)
211 class ImmutableFileNode:
212 implements(IImmutableFileNode)
214 # I wrap a CiphertextFileNode with a decryption key
215 def __init__(self, filecap, storage_broker, secret_holder, terminator,
217 assert isinstance(filecap, uri.CHKFileURI)
218 verifycap = filecap.get_verify_cap()
219 self._cnode = CiphertextFileNode(verifycap, storage_broker,
220 secret_holder, terminator, history)
221 assert isinstance(filecap, uri.CHKFileURI)
223 self._readkey = filecap.key
225 # TODO: I'm not sure about this.. what's the use case for node==node? If
226 # we keep it here, we should also put this on CiphertextFileNode
228 return self.u.__hash__()
229 def __eq__(self, other):
230 if isinstance(other, ImmutableFileNode):
231 return self.u.__eq__(other.u)
234 def __ne__(self, other):
235 if isinstance(other, ImmutableFileNode):
236 return self.u.__eq__(other.u)
240 def read(self, consumer, offset=0, size=None):
241 decryptor = DecryptingConsumer(consumer, self._readkey, offset)
242 d = self._cnode.read(decryptor, offset, size)
243 d.addCallback(lambda dc: consumer)
246 def raise_error(self):
249 def get_write_uri(self):
252 def get_readonly_uri(self):
253 return self.get_uri()
256 return self.u.to_string()
259 def get_readcap(self):
260 return self.u.get_readonly()
261 def get_verify_cap(self):
262 return self.u.get_verify_cap()
263 def get_repair_cap(self):
264 # CHK files can be repaired with just the verifycap
265 return self.u.get_verify_cap()
267 def get_storage_index(self):
268 return self.u.get_storage_index()
271 return self.u.get_size()
272 def get_current_size(self):
273 return defer.succeed(self.get_size())
275 def is_mutable(self):
278 def is_readonly(self):
281 def is_unknown(self):
284 def is_allowed_in_immutable_directory(self):
287 def check_and_repair(self, monitor, verify=False, add_lease=False):
288 return self._cnode.check_and_repair(monitor, verify, add_lease)
289 def check(self, monitor, verify=False, add_lease=False):
290 return self._cnode.check(monitor, verify, add_lease)