6 from zope.interface import implements
7 from twisted.internet import defer
9 from allmydata import uri
10 from twisted.internet.interfaces import IConsumer
11 from allmydata.interfaces import IImmutableFileNode, IUploadResults
12 from allmydata.util import consumer
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.util.dictutil import DictOfSets
15 from pycryptopp.cipher.aes import AES
18 from allmydata.immutable.checker import Checker
19 from allmydata.immutable.repairer import Repairer
20 from allmydata.immutable.downloader.node import DownloadNode, \
21 IDownloadStatusHandlingConsumer
22 from allmydata.immutable.downloader.status import DownloadStatus
24 class CiphertextFileNode:
25 def __init__(self, verifycap, storage_broker, secret_holder,
27 assert isinstance(verifycap, uri.CHKFileVerifierURI)
28 self._verifycap = verifycap
29 self._storage_broker = storage_broker
30 self._secret_holder = secret_holder
31 self._terminator = terminator
32 self._history = history
33 self._download_status = None
34 self._node = None # created lazily, on read()
36 def _maybe_create_download_node(self):
37 if not self._download_status:
38 ds = DownloadStatus(self._verifycap.storage_index,
41 self._history.add_download(ds)
42 self._download_status = ds
43 if self._node is None:
44 self._node = DownloadNode(self._verifycap, self._storage_broker,
47 self._history, self._download_status)
49 def read(self, consumer, offset=0, size=None):
50 """I am the main entry point, from which FileNode.read() can get
51 data. I feed the consumer with the desired range of ciphertext. I
52 return a Deferred that fires (with the consumer) when the read is
54 self._maybe_create_download_node()
55 return self._node.read(consumer, offset, size)
57 def get_segment(self, segnum):
58 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
59 Deferred that fires with (offset,data) when the desired segment is
60 available, and c is an object on which c.cancel() can be called to
61 disavow interest in the segment (after which 'd' will never fire).
63 You probably need to know the segment size before calling this,
64 unless you want the first few bytes of the file. If you ask for a
65 segment number which turns out to be too large, the Deferred will
66 errback with BadSegmentNumberError.
68 The Deferred fires with the offset of the first byte of the data
69 segment, so that you can call get_segment() before knowing the
70 segment size, and still know which data you received.
72 self._maybe_create_download_node()
73 return self._node.get_segment(segnum)
75 def get_segment_size(self):
76 # return a Deferred that fires with the file's real segment size
77 self._maybe_create_download_node()
78 return self._node.get_segsize()
80 def get_storage_index(self):
81 return self._verifycap.storage_index
82 def get_verify_cap(self):
83 return self._verifycap
85 return self._verifycap.size
87 def raise_error(self):
91 def check_and_repair(self, monitor, verify=False, add_lease=False):
92 verifycap = self._verifycap
93 storage_index = verifycap.storage_index
94 sb = self._storage_broker
95 servers = sb.get_connected_servers()
96 sh = self._secret_holder
98 c = Checker(verifycap=verifycap, servers=servers,
99 verify=verify, add_lease=add_lease, secret_holder=sh,
102 def _maybe_repair(cr):
103 crr = CheckAndRepairResults(storage_index)
104 crr.pre_repair_results = cr
106 crr.post_repair_results = cr
107 return defer.succeed(crr)
109 crr.repair_attempted = True
110 crr.repair_successful = False # until proven successful
111 def _gather_repair_results(ur):
112 assert IUploadResults.providedBy(ur), ur
113 # clone the cr (check results) to form the basis of the
114 # prr (post-repair results)
115 prr = CheckResults(cr.uri, cr.storage_index)
116 prr.data = copy.deepcopy(cr.data)
118 sm = prr.data['sharemap']
119 assert isinstance(sm, DictOfSets), sm
120 sm.update(ur.sharemap)
121 servers_responding = set(prr.data['servers-responding'])
122 for shnum, serverids in ur.sharemap.items():
123 servers_responding.update(serverids)
124 servers_responding = sorted(servers_responding)
125 prr.data['servers-responding'] = servers_responding
126 prr.data['count-shares-good'] = len(sm)
127 good_hosts = len(reduce(set.union, sm.itervalues(), set()))
128 prr.data['count-good-share-hosts'] = good_hosts
129 is_healthy = bool(len(sm) >= verifycap.total_shares)
130 is_recoverable = bool(len(sm) >= verifycap.needed_shares)
131 prr.set_healthy(is_healthy)
132 prr.set_recoverable(is_recoverable)
133 crr.repair_successful = is_healthy
134 prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
136 crr.post_repair_results = prr
138 def _repair_error(f):
139 # as with mutable repair, I'm not sure if I want to pass
140 # through a failure or not. TODO
141 crr.repair_successful = False
142 crr.repair_failure = f
144 r = Repairer(self, storage_broker=sb, secret_holder=sh,
147 d.addCallbacks(_gather_repair_results, _repair_error)
150 d.addCallback(_maybe_repair)
153 def check(self, monitor, verify=False, add_lease=False):
154 verifycap = self._verifycap
155 sb = self._storage_broker
156 servers = sb.get_connected_servers()
157 sh = self._secret_holder
159 v = Checker(verifycap=verifycap, servers=servers,
160 verify=verify, add_lease=add_lease, secret_holder=sh,
164 class DecryptingConsumer:
165 """I sit between a CiphertextDownloader (which acts as a Producer) and
166 the real Consumer, decrypting everything that passes by. The real
167 Consumer sees the real Producer, but the Producer sees us instead of the
169 implements(IConsumer, IDownloadStatusHandlingConsumer)
171 def __init__(self, consumer, readkey, offset):
172 self._consumer = consumer
174 self._download_status = None
175 # TODO: pycryptopp CTR-mode needs random-access operations: I want
176 # either a=AES(readkey, offset) or better yet both of:
177 # a=AES(readkey, offset=0)
178 # a.process(ciphertext, offset=xyz)
179 # For now, we fake it with the existing iv= argument.
180 offset_big = offset // 16
181 offset_small = offset % 16
182 iv = binascii.unhexlify("%032x" % offset_big)
183 self._decryptor = AES(readkey, iv=iv)
184 self._decryptor.process("\x00"*offset_small)
186 def set_download_status_read_event(self, read_ev):
187 self._read_ev = read_ev
188 def set_download_status(self, ds):
189 self._download_status = ds
191 def registerProducer(self, producer, streaming):
192 # this passes through, so the real consumer can flow-control the real
193 # producer. Therefore we don't need to provide any IPushProducer
194 # methods. We implement all the IConsumer methods as pass-throughs,
195 # and only intercept write() to perform decryption.
196 self._consumer.registerProducer(producer, streaming)
197 def unregisterProducer(self):
198 self._consumer.unregisterProducer()
199 def write(self, ciphertext):
201 plaintext = self._decryptor.process(ciphertext)
203 elapsed = now() - started
204 self._read_ev.update(0, elapsed, 0)
205 if self._download_status:
206 self._download_status.add_misc_event("AES", started, now())
207 self._consumer.write(plaintext)
209 class ImmutableFileNode:
210 implements(IImmutableFileNode)
212 # I wrap a CiphertextFileNode with a decryption key
213 def __init__(self, filecap, storage_broker, secret_holder, terminator,
215 assert isinstance(filecap, uri.CHKFileURI)
216 verifycap = filecap.get_verify_cap()
217 self._cnode = CiphertextFileNode(verifycap, storage_broker,
218 secret_holder, terminator, history)
219 assert isinstance(filecap, uri.CHKFileURI)
221 self._readkey = filecap.key
223 # TODO: I'm not sure about this.. what's the use case for node==node? If
224 # we keep it here, we should also put this on CiphertextFileNode
226 return self.u.__hash__()
227 def __eq__(self, other):
228 if isinstance(other, ImmutableFileNode):
229 return self.u.__eq__(other.u)
232 def __ne__(self, other):
233 if isinstance(other, ImmutableFileNode):
234 return self.u.__eq__(other.u)
238 def read(self, consumer, offset=0, size=None):
239 decryptor = DecryptingConsumer(consumer, self._readkey, offset)
240 d = self._cnode.read(decryptor, offset, size)
241 d.addCallback(lambda dc: consumer)
244 def raise_error(self):
247 def get_write_uri(self):
250 def get_readonly_uri(self):
251 return self.get_uri()
254 return self.u.to_string()
257 def get_readcap(self):
258 return self.u.get_readonly()
259 def get_verify_cap(self):
260 return self.u.get_verify_cap()
261 def get_repair_cap(self):
262 # CHK files can be repaired with just the verifycap
263 return self.u.get_verify_cap()
265 def get_storage_index(self):
266 return self.u.get_storage_index()
269 return self.u.get_size()
270 def get_current_size(self):
271 return defer.succeed(self.get_size())
273 def is_mutable(self):
276 def is_readonly(self):
279 def is_unknown(self):
282 def is_allowed_in_immutable_directory(self):
285 def check_and_repair(self, monitor, verify=False, add_lease=False):
286 return self._cnode.check_and_repair(monitor, verify, add_lease)
287 def check(self, monitor, verify=False, add_lease=False):
288 return self._cnode.check(monitor, verify, add_lease)
290 def get_best_readable_version(self):
292 Return an IReadable of the best version of this file. Since
293 immutable files can have only one version, we just return the
296 return defer.succeed(self)
299 def download_best_version(self):
301 Download the best version of this file, returning its contents
302 as a bytestring. Since there is only one version of an immutable
303 file, we download and return the contents of this file.
305 d = consumer.download_to_data(self)
308 # for an immutable file, download_to_data (specified in IReadable)
309 # is the same as download_best_version (specified in IFileNode). For
310 # mutable files, the difference is more meaningful, since they can
311 # have multiple versions.
312 download_to_data = download_best_version
315 # get_size() (IReadable), get_current_size() (IFilesystemNode), and
316 # get_size_of_best_version(IFileNode) are all the same for immutable
318 get_size_of_best_version = get_current_size