5 from zope.interface import implements
6 from twisted.internet import defer
8 from allmydata import uri
9 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata.util import consumer
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode, \
20 IDownloadStatusHandlingConsumer
21 from allmydata.immutable.downloader.status import DownloadStatus
23 class CiphertextFileNode:
24 def __init__(self, verifycap, storage_broker, secret_holder,
26 assert isinstance(verifycap, uri.CHKFileVerifierURI)
27 self._verifycap = verifycap
28 self._storage_broker = storage_broker
29 self._secret_holder = secret_holder
30 self._terminator = terminator
31 self._history = history
32 self._download_status = None
33 self._node = None # created lazily, on read()
35 def _maybe_create_download_node(self):
36 if not self._download_status:
37 ds = DownloadStatus(self._verifycap.storage_index,
40 self._history.add_download(ds)
41 self._download_status = ds
42 if self._node is None:
43 self._node = DownloadNode(self._verifycap, self._storage_broker,
46 self._history, self._download_status)
48 def read(self, consumer, offset=0, size=None):
49 """I am the main entry point, from which FileNode.read() can get
50 data. I feed the consumer with the desired range of ciphertext. I
51 return a Deferred that fires (with the consumer) when the read is
53 self._maybe_create_download_node()
54 return self._node.read(consumer, offset, size)
56 def get_segment(self, segnum):
57 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
58 Deferred that fires with (offset,data) when the desired segment is
59 available, and c is an object on which c.cancel() can be called to
60 disavow interest in the segment (after which 'd' will never fire).
62 You probably need to know the segment size before calling this,
63 unless you want the first few bytes of the file. If you ask for a
64 segment number which turns out to be too large, the Deferred will
65 errback with BadSegmentNumberError.
67 The Deferred fires with the offset of the first byte of the data
68 segment, so that you can call get_segment() before knowing the
69 segment size, and still know which data you received.
71 self._maybe_create_download_node()
72 return self._node.get_segment(segnum)
74 def get_segment_size(self):
75 # return a Deferred that fires with the file's real segment size
76 self._maybe_create_download_node()
77 return self._node.get_segsize()
79 def get_storage_index(self):
80 return self._verifycap.storage_index
81 def get_verify_cap(self):
82 return self._verifycap
84 return self._verifycap.size
86 def raise_error(self):
92 def check_and_repair(self, monitor, verify=False, add_lease=False):
93 c = Checker(verifycap=self._verifycap,
94 servers=self._storage_broker.get_connected_servers(),
95 verify=verify, add_lease=add_lease,
96 secret_holder=self._secret_holder,
99 d.addCallback(self._maybe_repair, monitor)
102 def _maybe_repair(self, cr, monitor):
103 crr = CheckAndRepairResults(self._verifycap.storage_index)
104 crr.pre_repair_results = cr
106 crr.post_repair_results = cr
107 return defer.succeed(crr)
109 crr.repair_attempted = True
110 crr.repair_successful = False # until proven successful
111 def _repair_error(f):
112 # as with mutable repair, I'm not sure if I want to pass
113 # through a failure or not. TODO
114 crr.repair_successful = False
115 crr.repair_failure = f
117 r = Repairer(self, storage_broker=self._storage_broker,
118 secret_holder=self._secret_holder,
121 d.addCallbacks(self._gather_repair_results, _repair_error,
122 callbackArgs=(cr, crr,))
125 def _gather_repair_results(self, ur, cr, crr):
126 assert IUploadResults.providedBy(ur), ur
127 # clone the cr (check results) to form the basis of the
128 # prr (post-repair results)
130 verifycap = self._verifycap
131 servers_responding = set(cr.get_servers_responding())
133 assert isinstance(cr.get_sharemap(), DictOfSets)
134 for shnum, servers in cr.get_sharemap().items():
135 for server in servers:
136 sm.add(shnum, server)
137 for shnum, servers in ur.get_sharemap().items():
138 for server in servers:
139 sm.add(shnum, server)
140 servers_responding.add(server)
141 servers_responding = sorted(servers_responding)
143 good_hosts = len(reduce(set.union, sm.values(), set()))
144 is_healthy = bool(len(sm) >= verifycap.total_shares)
145 is_recoverable = bool(len(sm) >= verifycap.needed_shares)
146 needs_rebalancing = bool(len(sm) >= verifycap.total_shares)
147 prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
148 healthy=is_healthy, recoverable=is_recoverable,
149 needs_rebalancing=needs_rebalancing,
150 count_shares_needed=verifycap.needed_shares,
151 count_shares_expected=verifycap.total_shares,
152 count_shares_good=len(sm),
153 count_good_share_hosts=good_hosts,
154 count_recoverable_versions=int(is_recoverable),
155 count_unrecoverable_versions=int(not is_recoverable),
156 servers_responding=list(servers_responding),
158 count_wrong_shares=0, # no such thing as wrong, for immutable
159 list_corrupt_shares=cr.get_corrupt_shares(),
160 count_corrupt_shares=len(cr.get_corrupt_shares()),
161 list_incompatible_shares=cr.get_incompatible_shares(),
162 count_incompatible_shares=len(cr.get_incompatible_shares()),
167 crr.repair_successful = is_healthy
168 crr.post_repair_results = prr
171 def check(self, monitor, verify=False, add_lease=False):
172 verifycap = self._verifycap
173 sb = self._storage_broker
174 servers = sb.get_connected_servers()
175 sh = self._secret_holder
177 v = Checker(verifycap=verifycap, servers=servers,
178 verify=verify, add_lease=add_lease, secret_holder=sh,
182 class DecryptingConsumer:
183 """I sit between a CiphertextDownloader (which acts as a Producer) and
184 the real Consumer, decrypting everything that passes by. The real
185 Consumer sees the real Producer, but the Producer sees us instead of the
187 implements(IConsumer, IDownloadStatusHandlingConsumer)
189 def __init__(self, consumer, readkey, offset):
190 self._consumer = consumer
192 self._download_status = None
193 # TODO: pycryptopp CTR-mode needs random-access operations: I want
194 # either a=AES(readkey, offset) or better yet both of:
195 # a=AES(readkey, offset=0)
196 # a.process(ciphertext, offset=xyz)
197 # For now, we fake it with the existing iv= argument.
198 offset_big = offset // 16
199 offset_small = offset % 16
200 iv = binascii.unhexlify("%032x" % offset_big)
201 self._decryptor = AES(readkey, iv=iv)
202 self._decryptor.process("\x00"*offset_small)
204 def set_download_status_read_event(self, read_ev):
205 self._read_ev = read_ev
206 def set_download_status(self, ds):
207 self._download_status = ds
209 def registerProducer(self, producer, streaming):
210 # this passes through, so the real consumer can flow-control the real
211 # producer. Therefore we don't need to provide any IPushProducer
212 # methods. We implement all the IConsumer methods as pass-throughs,
213 # and only intercept write() to perform decryption.
214 self._consumer.registerProducer(producer, streaming)
215 def unregisterProducer(self):
216 self._consumer.unregisterProducer()
217 def write(self, ciphertext):
219 plaintext = self._decryptor.process(ciphertext)
221 elapsed = now() - started
222 self._read_ev.update(0, elapsed, 0)
223 if self._download_status:
224 self._download_status.add_misc_event("AES", started, now())
225 self._consumer.write(plaintext)
227 class ImmutableFileNode:
228 implements(IImmutableFileNode)
230 # I wrap a CiphertextFileNode with a decryption key
231 def __init__(self, filecap, storage_broker, secret_holder, terminator,
233 assert isinstance(filecap, uri.CHKFileURI)
234 verifycap = filecap.get_verify_cap()
235 self._cnode = CiphertextFileNode(verifycap, storage_broker,
236 secret_holder, terminator, history)
237 assert isinstance(filecap, uri.CHKFileURI)
239 self._readkey = filecap.key
241 # TODO: I'm not sure about this.. what's the use case for node==node? If
242 # we keep it here, we should also put this on CiphertextFileNode
244 return self.u.__hash__()
245 def __eq__(self, other):
246 if isinstance(other, ImmutableFileNode):
247 return self.u.__eq__(other.u)
250 def __ne__(self, other):
251 if isinstance(other, ImmutableFileNode):
252 return self.u.__eq__(other.u)
256 def read(self, consumer, offset=0, size=None):
257 decryptor = DecryptingConsumer(consumer, self._readkey, offset)
258 d = self._cnode.read(decryptor, offset, size)
259 d.addCallback(lambda dc: consumer)
262 def raise_error(self):
265 def get_write_uri(self):
268 def get_readonly_uri(self):
269 return self.get_uri()
272 return self.u.to_string()
275 def get_readcap(self):
276 return self.u.get_readonly()
277 def get_verify_cap(self):
278 return self.u.get_verify_cap()
279 def get_repair_cap(self):
280 # CHK files can be repaired with just the verifycap
281 return self.u.get_verify_cap()
283 def get_storage_index(self):
284 return self.u.get_storage_index()
287 return self.u.get_size()
288 def get_current_size(self):
289 return defer.succeed(self.get_size())
291 def is_mutable(self):
294 def is_readonly(self):
297 def is_unknown(self):
300 def is_allowed_in_immutable_directory(self):
303 def check_and_repair(self, monitor, verify=False, add_lease=False):
304 return self._cnode.check_and_repair(monitor, verify, add_lease)
305 def check(self, monitor, verify=False, add_lease=False):
306 return self._cnode.check(monitor, verify, add_lease)
308 def get_best_readable_version(self):
310 Return an IReadable of the best version of this file. Since
311 immutable files can have only one version, we just return the
314 return defer.succeed(self)
317 def download_best_version(self):
319 Download the best version of this file, returning its contents
320 as a bytestring. Since there is only one version of an immutable
321 file, we download and return the contents of this file.
323 d = consumer.download_to_data(self)
326 # for an immutable file, download_to_data (specified in IReadable)
327 # is the same as download_best_version (specified in IFileNode). For
328 # mutable files, the difference is more meaningful, since they can
329 # have multiple versions.
330 download_to_data = download_best_version
333 # get_size() (IReadable), get_current_size() (IFilesystemNode), and
334 # get_size_of_best_version(IFileNode) are all the same for immutable
336 get_size_of_best_version = get_current_size