5 from zope.interface import implements
6 from twisted.internet import defer
8 from allmydata import uri
9 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata.util import consumer
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from allmydata.util.happinessutil import servers_of_happiness
15 from pycryptopp.cipher.aes import AES
18 from allmydata.immutable.checker import Checker
19 from allmydata.immutable.repairer import Repairer
20 from allmydata.immutable.downloader.node import DownloadNode, \
21 IDownloadStatusHandlingConsumer
22 from allmydata.immutable.downloader.status import DownloadStatus
24 class CiphertextFileNode:
25 def __init__(self, verifycap, storage_broker, secret_holder,
27 assert isinstance(verifycap, uri.CHKFileVerifierURI)
28 self._verifycap = verifycap
29 self._storage_broker = storage_broker
30 self._secret_holder = secret_holder
31 self._terminator = terminator
32 self._history = history
33 self._download_status = None
34 self._node = None # created lazily, on read()
36 def _maybe_create_download_node(self):
37 if not self._download_status:
38 ds = DownloadStatus(self._verifycap.storage_index,
41 self._history.add_download(ds)
42 self._download_status = ds
43 if self._node is None:
44 self._node = DownloadNode(self._verifycap, self._storage_broker,
47 self._history, self._download_status)
49 def read(self, consumer, offset=0, size=None):
50 """I am the main entry point, from which FileNode.read() can get
51 data. I feed the consumer with the desired range of ciphertext. I
52 return a Deferred that fires (with the consumer) when the read is
54 self._maybe_create_download_node()
55 return self._node.read(consumer, offset, size)
57 def get_segment(self, segnum):
58 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
59 Deferred that fires with (offset,data) when the desired segment is
60 available, and c is an object on which c.cancel() can be called to
61 disavow interest in the segment (after which 'd' will never fire).
63 You probably need to know the segment size before calling this,
64 unless you want the first few bytes of the file. If you ask for a
65 segment number which turns out to be too large, the Deferred will
66 errback with BadSegmentNumberError.
68 The Deferred fires with the offset of the first byte of the data
69 segment, so that you can call get_segment() before knowing the
70 segment size, and still know which data you received.
72 self._maybe_create_download_node()
73 return self._node.get_segment(segnum)
75 def get_segment_size(self):
76 # return a Deferred that fires with the file's real segment size
77 self._maybe_create_download_node()
78 return self._node.get_segsize()
80 def get_storage_index(self):
81 return self._verifycap.storage_index
82 def get_verify_cap(self):
83 return self._verifycap
85 return self._verifycap.size
87 def raise_error(self):
93 def check_and_repair(self, monitor, verify=False, add_lease=False):
94 c = Checker(verifycap=self._verifycap,
95 servers=self._storage_broker.get_connected_servers(),
96 verify=verify, add_lease=add_lease,
97 secret_holder=self._secret_holder,
100 d.addCallback(self._maybe_repair, monitor)
103 def _maybe_repair(self, cr, monitor):
104 crr = CheckAndRepairResults(self._verifycap.storage_index)
105 crr.pre_repair_results = cr
107 crr.post_repair_results = cr
108 return defer.succeed(crr)
110 crr.repair_attempted = True
111 crr.repair_successful = False # until proven successful
112 def _repair_error(f):
113 # as with mutable repair, I'm not sure if I want to pass
114 # through a failure or not. TODO
115 crr.repair_successful = False
116 crr.repair_failure = f
118 r = Repairer(self, storage_broker=self._storage_broker,
119 secret_holder=self._secret_holder,
122 d.addCallbacks(self._gather_repair_results, _repair_error,
123 callbackArgs=(cr, crr,))
126 def _gather_repair_results(self, ur, cr, crr):
127 assert IUploadResults.providedBy(ur), ur
128 # clone the cr (check results) to form the basis of the
129 # prr (post-repair results)
131 verifycap = self._verifycap
132 servers_responding = set(cr.get_servers_responding())
134 assert isinstance(cr.get_sharemap(), DictOfSets)
135 for shnum, servers in cr.get_sharemap().items():
136 for server in servers:
137 sm.add(shnum, server)
138 for shnum, servers in ur.get_sharemap().items():
139 for server in servers:
140 sm.add(shnum, server)
141 servers_responding.add(server)
142 servers_responding = sorted(servers_responding)
144 good_hosts = len(reduce(set.union, sm.values(), set()))
145 is_healthy = bool(len(sm) >= verifycap.total_shares)
146 is_recoverable = bool(len(sm) >= verifycap.needed_shares)
148 count_happiness = servers_of_happiness(sm)
150 prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
151 healthy=is_healthy, recoverable=is_recoverable,
152 count_happiness=count_happiness,
153 count_shares_needed=verifycap.needed_shares,
154 count_shares_expected=verifycap.total_shares,
155 count_shares_good=len(sm),
156 count_good_share_hosts=good_hosts,
157 count_recoverable_versions=int(is_recoverable),
158 count_unrecoverable_versions=int(not is_recoverable),
159 servers_responding=list(servers_responding),
161 count_wrong_shares=0, # no such thing as wrong, for immutable
162 list_corrupt_shares=cr.get_corrupt_shares(),
163 count_corrupt_shares=len(cr.get_corrupt_shares()),
164 list_incompatible_shares=cr.get_incompatible_shares(),
165 count_incompatible_shares=len(cr.get_incompatible_shares()),
170 crr.repair_successful = is_healthy
171 crr.post_repair_results = prr
174 def check(self, monitor, verify=False, add_lease=False):
175 verifycap = self._verifycap
176 sb = self._storage_broker
177 servers = sb.get_connected_servers()
178 sh = self._secret_holder
180 v = Checker(verifycap=verifycap, servers=servers,
181 verify=verify, add_lease=add_lease, secret_holder=sh,
185 class DecryptingConsumer:
186 """I sit between a CiphertextDownloader (which acts as a Producer) and
187 the real Consumer, decrypting everything that passes by. The real
188 Consumer sees the real Producer, but the Producer sees us instead of the
190 implements(IConsumer, IDownloadStatusHandlingConsumer)
192 def __init__(self, consumer, readkey, offset):
193 self._consumer = consumer
195 self._download_status = None
196 # TODO: pycryptopp CTR-mode needs random-access operations: I want
197 # either a=AES(readkey, offset) or better yet both of:
198 # a=AES(readkey, offset=0)
199 # a.process(ciphertext, offset=xyz)
200 # For now, we fake it with the existing iv= argument.
201 offset_big = offset // 16
202 offset_small = offset % 16
203 iv = binascii.unhexlify("%032x" % offset_big)
204 self._decryptor = AES(readkey, iv=iv)
205 self._decryptor.process("\x00"*offset_small)
207 def set_download_status_read_event(self, read_ev):
208 self._read_ev = read_ev
209 def set_download_status(self, ds):
210 self._download_status = ds
212 def registerProducer(self, producer, streaming):
213 # this passes through, so the real consumer can flow-control the real
214 # producer. Therefore we don't need to provide any IPushProducer
215 # methods. We implement all the IConsumer methods as pass-throughs,
216 # and only intercept write() to perform decryption.
217 self._consumer.registerProducer(producer, streaming)
218 def unregisterProducer(self):
219 self._consumer.unregisterProducer()
220 def write(self, ciphertext):
222 plaintext = self._decryptor.process(ciphertext)
224 elapsed = now() - started
225 self._read_ev.update(0, elapsed, 0)
226 if self._download_status:
227 self._download_status.add_misc_event("AES", started, now())
228 self._consumer.write(plaintext)
230 class ImmutableFileNode:
231 implements(IImmutableFileNode)
233 # I wrap a CiphertextFileNode with a decryption key
234 def __init__(self, filecap, storage_broker, secret_holder, terminator,
236 assert isinstance(filecap, uri.CHKFileURI)
237 verifycap = filecap.get_verify_cap()
238 self._cnode = CiphertextFileNode(verifycap, storage_broker,
239 secret_holder, terminator, history)
240 assert isinstance(filecap, uri.CHKFileURI)
242 self._readkey = filecap.key
244 # TODO: I'm not sure about this.. what's the use case for node==node? If
245 # we keep it here, we should also put this on CiphertextFileNode
247 return self.u.__hash__()
248 def __eq__(self, other):
249 if isinstance(other, ImmutableFileNode):
250 return self.u.__eq__(other.u)
253 def __ne__(self, other):
254 if isinstance(other, ImmutableFileNode):
255 return self.u.__eq__(other.u)
259 def read(self, consumer, offset=0, size=None):
260 decryptor = DecryptingConsumer(consumer, self._readkey, offset)
261 d = self._cnode.read(decryptor, offset, size)
262 d.addCallback(lambda dc: consumer)
265 def raise_error(self):
268 def get_write_uri(self):
271 def get_readonly_uri(self):
272 return self.get_uri()
275 return self.u.to_string()
278 def get_readcap(self):
279 return self.u.get_readonly()
280 def get_verify_cap(self):
281 return self.u.get_verify_cap()
282 def get_repair_cap(self):
283 # CHK files can be repaired with just the verifycap
284 return self.u.get_verify_cap()
286 def get_storage_index(self):
287 return self.u.get_storage_index()
290 return self.u.get_size()
291 def get_current_size(self):
292 return defer.succeed(self.get_size())
294 def is_mutable(self):
297 def is_readonly(self):
300 def is_unknown(self):
303 def is_allowed_in_immutable_directory(self):
306 def check_and_repair(self, monitor, verify=False, add_lease=False):
307 return self._cnode.check_and_repair(monitor, verify, add_lease)
308 def check(self, monitor, verify=False, add_lease=False):
309 return self._cnode.check(monitor, verify, add_lease)
311 def get_best_readable_version(self):
313 Return an IReadable of the best version of this file. Since
314 immutable files can have only one version, we just return the
317 return defer.succeed(self)
320 def download_best_version(self):
322 Download the best version of this file, returning its contents
323 as a bytestring. Since there is only one version of an immutable
324 file, we download and return the contents of this file.
326 d = consumer.download_to_data(self)
329 # for an immutable file, download_to_data (specified in IReadable)
330 # is the same as download_best_version (specified in IFileNode). For
331 # mutable files, the difference is more meaningful, since they can
332 # have multiple versions.
333 download_to_data = download_best_version
336 # get_size() (IReadable), get_current_size() (IFilesystemNode), and
337 # get_size_of_best_version(IFileNode) are all the same for immutable
339 get_size_of_best_version = get_current_size