5 from zope.interface import implements
6 from twisted.internet import defer
8 from allmydata import uri
9 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata.util import consumer
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode, \
20 IDownloadStatusHandlingConsumer
21 from allmydata.immutable.downloader.status import DownloadStatus
23 class CiphertextFileNode:
24 def __init__(self, verifycap, storage_broker, secret_holder,
26 assert isinstance(verifycap, uri.CHKFileVerifierURI)
27 self._verifycap = verifycap
28 self._storage_broker = storage_broker
29 self._secret_holder = secret_holder
30 self._terminator = terminator
31 self._history = history
32 self._download_status = None
33 self._node = None # created lazily, on read()
35 def _maybe_create_download_node(self):
36 if not self._download_status:
37 ds = DownloadStatus(self._verifycap.storage_index,
40 self._history.add_download(ds)
41 self._download_status = ds
42 if self._node is None:
43 self._node = DownloadNode(self._verifycap, self._storage_broker,
46 self._history, self._download_status)
48 def read(self, consumer, offset=0, size=None):
49 """I am the main entry point, from which FileNode.read() can get
50 data. I feed the consumer with the desired range of ciphertext. I
51 return a Deferred that fires (with the consumer) when the read is
53 self._maybe_create_download_node()
54 return self._node.read(consumer, offset, size)
56 def get_segment(self, segnum):
57 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
58 Deferred that fires with (offset,data) when the desired segment is
59 available, and c is an object on which c.cancel() can be called to
60 disavow interest in the segment (after which 'd' will never fire).
62 You probably need to know the segment size before calling this,
63 unless you want the first few bytes of the file. If you ask for a
64 segment number which turns out to be too large, the Deferred will
65 errback with BadSegmentNumberError.
67 The Deferred fires with the offset of the first byte of the data
68 segment, so that you can call get_segment() before knowing the
69 segment size, and still know which data you received.
71 self._maybe_create_download_node()
72 return self._node.get_segment(segnum)
74 def get_segment_size(self):
75 # return a Deferred that fires with the file's real segment size
76 self._maybe_create_download_node()
77 return self._node.get_segsize()
79 def get_storage_index(self):
80 return self._verifycap.storage_index
81 def get_verify_cap(self):
82 return self._verifycap
84 return self._verifycap.size
86 def raise_error(self):
90 def check_and_repair(self, monitor, verify=False, add_lease=False):
91 c = Checker(verifycap=self._verifycap,
92 servers=self._storage_broker.get_connected_servers(),
93 verify=verify, add_lease=add_lease,
94 secret_holder=self._secret_holder,
97 d.addCallback(self._maybe_repair, monitor)
100 def _maybe_repair(self, cr, monitor):
101 crr = CheckAndRepairResults(self._verifycap.storage_index)
102 crr.pre_repair_results = cr
104 crr.post_repair_results = cr
105 return defer.succeed(crr)
107 crr.repair_attempted = True
108 crr.repair_successful = False # until proven successful
109 def _repair_error(f):
110 # as with mutable repair, I'm not sure if I want to pass
111 # through a failure or not. TODO
112 crr.repair_successful = False
113 crr.repair_failure = f
115 r = Repairer(self, storage_broker=self._storage_broker,
116 secret_holder=self._secret_holder,
119 d.addCallbacks(self._gather_repair_results, _repair_error,
120 callbackArgs=(cr, crr,))
123 def _gather_repair_results(self, ur, cr, crr):
124 assert IUploadResults.providedBy(ur), ur
125 # clone the cr (check results) to form the basis of the
126 # prr (post-repair results)
127 prr = CheckResults(cr.uri, cr.storage_index)
129 verifycap = self._verifycap
130 servers_responding = set(cr.get_servers_responding())
132 assert isinstance(cr.get_sharemap(), DictOfSets)
133 for shnum, serverids in cr.get_sharemap().items():
134 for serverid in serverids:
135 sm.add(shnum, serverid)
136 for shnum, servers in ur.get_sharemap().items():
138 sm.add(shnum, s.get_serverid())
139 servers_responding.add(s.get_serverid())
140 servers_responding = sorted(servers_responding)
142 good_hosts = len(reduce(set.union, sm.values(), set()))
143 is_healthy = bool(len(sm) >= verifycap.total_shares)
144 is_recoverable = bool(len(sm) >= verifycap.needed_shares)
146 count_shares_needed=verifycap.needed_shares,
147 count_shares_expected=verifycap.total_shares,
148 count_shares_good=len(sm),
149 count_good_share_hosts=good_hosts,
150 count_recoverable_versions=int(is_recoverable),
151 count_unrecoverable_versions=int(not is_recoverable),
152 servers_responding=list(servers_responding),
154 count_wrong_shares=0, # no such thing as wrong, for immutable
155 list_corrupt_shares=cr.get_corrupt_shares(),
156 count_corrupt_shares=len(cr.get_corrupt_shares()),
157 list_incompatible_shares=cr.get_incompatible_shares(),
158 count_incompatible_shares=len(cr.get_incompatible_shares()),
160 prr.set_healthy(is_healthy)
161 prr.set_recoverable(is_recoverable)
162 crr.repair_successful = is_healthy
163 prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
165 crr.post_repair_results = prr
168 def check(self, monitor, verify=False, add_lease=False):
169 verifycap = self._verifycap
170 sb = self._storage_broker
171 servers = sb.get_connected_servers()
172 sh = self._secret_holder
174 v = Checker(verifycap=verifycap, servers=servers,
175 verify=verify, add_lease=add_lease, secret_holder=sh,
179 class DecryptingConsumer:
180 """I sit between a CiphertextDownloader (which acts as a Producer) and
181 the real Consumer, decrypting everything that passes by. The real
182 Consumer sees the real Producer, but the Producer sees us instead of the
184 implements(IConsumer, IDownloadStatusHandlingConsumer)
186 def __init__(self, consumer, readkey, offset):
187 self._consumer = consumer
189 self._download_status = None
190 # TODO: pycryptopp CTR-mode needs random-access operations: I want
191 # either a=AES(readkey, offset) or better yet both of:
192 # a=AES(readkey, offset=0)
193 # a.process(ciphertext, offset=xyz)
194 # For now, we fake it with the existing iv= argument.
195 offset_big = offset // 16
196 offset_small = offset % 16
197 iv = binascii.unhexlify("%032x" % offset_big)
198 self._decryptor = AES(readkey, iv=iv)
199 self._decryptor.process("\x00"*offset_small)
201 def set_download_status_read_event(self, read_ev):
202 self._read_ev = read_ev
203 def set_download_status(self, ds):
204 self._download_status = ds
206 def registerProducer(self, producer, streaming):
207 # this passes through, so the real consumer can flow-control the real
208 # producer. Therefore we don't need to provide any IPushProducer
209 # methods. We implement all the IConsumer methods as pass-throughs,
210 # and only intercept write() to perform decryption.
211 self._consumer.registerProducer(producer, streaming)
212 def unregisterProducer(self):
213 self._consumer.unregisterProducer()
214 def write(self, ciphertext):
216 plaintext = self._decryptor.process(ciphertext)
218 elapsed = now() - started
219 self._read_ev.update(0, elapsed, 0)
220 if self._download_status:
221 self._download_status.add_misc_event("AES", started, now())
222 self._consumer.write(plaintext)
224 class ImmutableFileNode:
225 implements(IImmutableFileNode)
227 # I wrap a CiphertextFileNode with a decryption key
228 def __init__(self, filecap, storage_broker, secret_holder, terminator,
230 assert isinstance(filecap, uri.CHKFileURI)
231 verifycap = filecap.get_verify_cap()
232 self._cnode = CiphertextFileNode(verifycap, storage_broker,
233 secret_holder, terminator, history)
234 assert isinstance(filecap, uri.CHKFileURI)
236 self._readkey = filecap.key
238 # TODO: I'm not sure about this.. what's the use case for node==node? If
239 # we keep it here, we should also put this on CiphertextFileNode
241 return self.u.__hash__()
242 def __eq__(self, other):
243 if isinstance(other, ImmutableFileNode):
244 return self.u.__eq__(other.u)
247 def __ne__(self, other):
248 if isinstance(other, ImmutableFileNode):
249 return self.u.__eq__(other.u)
253 def read(self, consumer, offset=0, size=None):
254 decryptor = DecryptingConsumer(consumer, self._readkey, offset)
255 d = self._cnode.read(decryptor, offset, size)
256 d.addCallback(lambda dc: consumer)
259 def raise_error(self):
262 def get_write_uri(self):
265 def get_readonly_uri(self):
266 return self.get_uri()
269 return self.u.to_string()
272 def get_readcap(self):
273 return self.u.get_readonly()
274 def get_verify_cap(self):
275 return self.u.get_verify_cap()
276 def get_repair_cap(self):
277 # CHK files can be repaired with just the verifycap
278 return self.u.get_verify_cap()
280 def get_storage_index(self):
281 return self.u.get_storage_index()
284 return self.u.get_size()
285 def get_current_size(self):
286 return defer.succeed(self.get_size())
288 def is_mutable(self):
291 def is_readonly(self):
294 def is_unknown(self):
297 def is_allowed_in_immutable_directory(self):
300 def check_and_repair(self, monitor, verify=False, add_lease=False):
301 return self._cnode.check_and_repair(monitor, verify, add_lease)
302 def check(self, monitor, verify=False, add_lease=False):
303 return self._cnode.check(monitor, verify, add_lease)
305 def get_best_readable_version(self):
307 Return an IReadable of the best version of this file. Since
308 immutable files can have only one version, we just return the
311 return defer.succeed(self)
314 def download_best_version(self):
316 Download the best version of this file, returning its contents
317 as a bytestring. Since there is only one version of an immutable
318 file, we download and return the contents of this file.
320 d = consumer.download_to_data(self)
323 # for an immutable file, download_to_data (specified in IReadable)
324 # is the same as download_best_version (specified in IFileNode). For
325 # mutable files, the difference is more meaningful, since they can
326 # have multiple versions.
327 download_to_data = download_best_version
330 # get_size() (IReadable), get_current_size() (IFilesystemNode), and
331 # get_size_of_best_version(IFileNode) are all the same for immutable
333 get_size_of_best_version = get_current_size