6 from zope.interface import implements
7 from twisted.internet import defer
9 from allmydata import uri
10 from twisted.internet.interfaces import IConsumer
11 from allmydata.interfaces import IImmutableFileNode, IUploadResults
12 from allmydata.util import consumer
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.util.dictutil import DictOfSets
15 from pycryptopp.cipher.aes import AES
18 from allmydata.immutable.checker import Checker
19 from allmydata.immutable.repairer import Repairer
20 from allmydata.immutable.downloader.node import DownloadNode, \
21 IDownloadStatusHandlingConsumer
22 from allmydata.immutable.downloader.status import DownloadStatus
24 class CiphertextFileNode:
25 def __init__(self, verifycap, storage_broker, secret_holder,
27 assert isinstance(verifycap, uri.CHKFileVerifierURI)
28 self._verifycap = verifycap
29 self._storage_broker = storage_broker
30 self._secret_holder = secret_holder
31 self._terminator = terminator
32 self._history = history
33 self._download_status = None
34 self._node = None # created lazily, on read()
36 def _maybe_create_download_node(self):
37 if not self._download_status:
38 ds = DownloadStatus(self._verifycap.storage_index,
41 self._history.add_download(ds)
42 self._download_status = ds
43 if self._node is None:
44 self._node = DownloadNode(self._verifycap, self._storage_broker,
47 self._history, self._download_status)
49 def read(self, consumer, offset=0, size=None):
50 """I am the main entry point, from which FileNode.read() can get
51 data. I feed the consumer with the desired range of ciphertext. I
52 return a Deferred that fires (with the consumer) when the read is
54 self._maybe_create_download_node()
55 return self._node.read(consumer, offset, size)
57 def get_segment(self, segnum):
58 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
59 Deferred that fires with (offset,data) when the desired segment is
60 available, and c is an object on which c.cancel() can be called to
61 disavow interest in the segment (after which 'd' will never fire).
63 You probably need to know the segment size before calling this,
64 unless you want the first few bytes of the file. If you ask for a
65 segment number which turns out to be too large, the Deferred will
66 errback with BadSegmentNumberError.
68 The Deferred fires with the offset of the first byte of the data
69 segment, so that you can call get_segment() before knowing the
70 segment size, and still know which data you received.
72 self._maybe_create_download_node()
73 return self._node.get_segment(segnum)
75 def get_segment_size(self):
76 # return a Deferred that fires with the file's real segment size
77 self._maybe_create_download_node()
78 return self._node.get_segsize()
80 def get_storage_index(self):
81 return self._verifycap.storage_index
82 def get_verify_cap(self):
83 return self._verifycap
85 return self._verifycap.size
87 def raise_error(self):
91 def check_and_repair(self, monitor, verify=False, add_lease=False):
92 c = Checker(verifycap=self._verifycap,
93 servers=self._storage_broker.get_connected_servers(),
94 verify=verify, add_lease=add_lease,
95 secret_holder=self._secret_holder,
98 d.addCallback(self._maybe_repair, monitor)
101 def _maybe_repair(self, cr, monitor):
102 crr = CheckAndRepairResults(self._verifycap.storage_index)
103 crr.pre_repair_results = cr
105 crr.post_repair_results = cr
106 return defer.succeed(crr)
108 crr.repair_attempted = True
109 crr.repair_successful = False # until proven successful
110 def _repair_error(f):
111 # as with mutable repair, I'm not sure if I want to pass
112 # through a failure or not. TODO
113 crr.repair_successful = False
114 crr.repair_failure = f
116 r = Repairer(self, storage_broker=self._storage_broker,
117 secret_holder=self._secret_holder,
120 d.addCallbacks(self._gather_repair_results, _repair_error,
121 callbackArgs=(cr, crr,))
124 def _gather_repair_results(self, ur, cr, crr):
125 assert IUploadResults.providedBy(ur), ur
126 # clone the cr (check results) to form the basis of the
127 # prr (post-repair results)
128 prr = CheckResults(cr.uri, cr.storage_index)
129 prr_data = copy.deepcopy(cr.get_data())
131 verifycap = self._verifycap
132 servers_responding = set(prr_data['servers-responding'])
133 sm = prr_data['sharemap']
134 assert isinstance(sm, DictOfSets), sm
135 for shnum, servers in ur.get_sharemap().items():
137 sm.add(shnum, s.get_serverid())
138 servers_responding.add(s.get_serverid())
139 servers_responding = sorted(servers_responding)
141 good_hosts = len(reduce(set.union, sm.itervalues(), set()))
142 is_healthy = bool(len(sm) >= verifycap.total_shares)
143 is_recoverable = bool(len(sm) >= verifycap.needed_shares)
145 count_shares_needed=verifycap.needed_shares,
146 count_shares_expected=verifycap.total_shares,
147 count_shares_good=len(sm),
148 count_good_share_hosts=good_hosts,
149 count_recoverable_versions=int(is_recoverable),
150 count_unrecoverable_versions=int(not is_recoverable),
151 servers_responding=list(servers_responding),
153 count_wrong_shares=0, # no such thing as wrong, for immutable
154 list_corrupt_shares=prr_data["list-corrupt-shares"],
155 count_corrupt_shares=prr_data["count-corrupt-shares"],
156 list_incompatible_shares=prr_data["list-incompatible-shares"],
157 count_incompatible_shares=prr_data["count-incompatible-shares"],
159 prr.set_healthy(is_healthy)
160 prr.set_recoverable(is_recoverable)
161 crr.repair_successful = is_healthy
162 prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
164 crr.post_repair_results = prr
167 def check(self, monitor, verify=False, add_lease=False):
168 verifycap = self._verifycap
169 sb = self._storage_broker
170 servers = sb.get_connected_servers()
171 sh = self._secret_holder
173 v = Checker(verifycap=verifycap, servers=servers,
174 verify=verify, add_lease=add_lease, secret_holder=sh,
178 class DecryptingConsumer:
179 """I sit between a CiphertextDownloader (which acts as a Producer) and
180 the real Consumer, decrypting everything that passes by. The real
181 Consumer sees the real Producer, but the Producer sees us instead of the
183 implements(IConsumer, IDownloadStatusHandlingConsumer)
185 def __init__(self, consumer, readkey, offset):
186 self._consumer = consumer
188 self._download_status = None
189 # TODO: pycryptopp CTR-mode needs random-access operations: I want
190 # either a=AES(readkey, offset) or better yet both of:
191 # a=AES(readkey, offset=0)
192 # a.process(ciphertext, offset=xyz)
193 # For now, we fake it with the existing iv= argument.
194 offset_big = offset // 16
195 offset_small = offset % 16
196 iv = binascii.unhexlify("%032x" % offset_big)
197 self._decryptor = AES(readkey, iv=iv)
198 self._decryptor.process("\x00"*offset_small)
200 def set_download_status_read_event(self, read_ev):
201 self._read_ev = read_ev
202 def set_download_status(self, ds):
203 self._download_status = ds
205 def registerProducer(self, producer, streaming):
206 # this passes through, so the real consumer can flow-control the real
207 # producer. Therefore we don't need to provide any IPushProducer
208 # methods. We implement all the IConsumer methods as pass-throughs,
209 # and only intercept write() to perform decryption.
210 self._consumer.registerProducer(producer, streaming)
211 def unregisterProducer(self):
212 self._consumer.unregisterProducer()
213 def write(self, ciphertext):
215 plaintext = self._decryptor.process(ciphertext)
217 elapsed = now() - started
218 self._read_ev.update(0, elapsed, 0)
219 if self._download_status:
220 self._download_status.add_misc_event("AES", started, now())
221 self._consumer.write(plaintext)
223 class ImmutableFileNode:
224 implements(IImmutableFileNode)
226 # I wrap a CiphertextFileNode with a decryption key
227 def __init__(self, filecap, storage_broker, secret_holder, terminator,
229 assert isinstance(filecap, uri.CHKFileURI)
230 verifycap = filecap.get_verify_cap()
231 self._cnode = CiphertextFileNode(verifycap, storage_broker,
232 secret_holder, terminator, history)
233 assert isinstance(filecap, uri.CHKFileURI)
235 self._readkey = filecap.key
237 # TODO: I'm not sure about this.. what's the use case for node==node? If
238 # we keep it here, we should also put this on CiphertextFileNode
240 return self.u.__hash__()
241 def __eq__(self, other):
242 if isinstance(other, ImmutableFileNode):
243 return self.u.__eq__(other.u)
246 def __ne__(self, other):
247 if isinstance(other, ImmutableFileNode):
248 return self.u.__eq__(other.u)
252 def read(self, consumer, offset=0, size=None):
253 decryptor = DecryptingConsumer(consumer, self._readkey, offset)
254 d = self._cnode.read(decryptor, offset, size)
255 d.addCallback(lambda dc: consumer)
258 def raise_error(self):
261 def get_write_uri(self):
264 def get_readonly_uri(self):
265 return self.get_uri()
268 return self.u.to_string()
271 def get_readcap(self):
272 return self.u.get_readonly()
273 def get_verify_cap(self):
274 return self.u.get_verify_cap()
275 def get_repair_cap(self):
276 # CHK files can be repaired with just the verifycap
277 return self.u.get_verify_cap()
279 def get_storage_index(self):
280 return self.u.get_storage_index()
283 return self.u.get_size()
284 def get_current_size(self):
285 return defer.succeed(self.get_size())
287 def is_mutable(self):
290 def is_readonly(self):
293 def is_unknown(self):
296 def is_allowed_in_immutable_directory(self):
299 def check_and_repair(self, monitor, verify=False, add_lease=False):
300 return self._cnode.check_and_repair(monitor, verify, add_lease)
301 def check(self, monitor, verify=False, add_lease=False):
302 return self._cnode.check(monitor, verify, add_lease)
304 def get_best_readable_version(self):
306 Return an IReadable of the best version of this file. Since
307 immutable files can have only one version, we just return the
310 return defer.succeed(self)
313 def download_best_version(self):
315 Download the best version of this file, returning its contents
316 as a bytestring. Since there is only one version of an immutable
317 file, we download and return the contents of this file.
319 d = consumer.download_to_data(self)
322 # for an immutable file, download_to_data (specified in IReadable)
323 # is the same as download_best_version (specified in IFileNode). For
324 # mutable files, the difference is more meaningful, since they can
325 # have multiple versions.
326 download_to_data = download_best_version
329 # get_size() (IReadable), get_current_size() (IFilesystemNode), and
330 # get_size_of_best_version(IFileNode) are all the same for immutable
332 get_size_of_best_version = get_current_size