6 from zope.interface import implements
7 from twisted.internet import defer
9 from allmydata import uri
10 from twisted.internet.interfaces import IConsumer
11 from twisted.protocols import basic
12 from foolscap.api import eventually
13 from allmydata.interfaces import IImmutableFileNode, ICheckable, \
14 IDownloadTarget, IUploadResults
15 from allmydata.util import dictutil, log, base32, consumer
16 from allmydata.immutable.checker import Checker
17 from allmydata.check_results import CheckResults, CheckAndRepairResults
18 from allmydata.util.dictutil import DictOfSets
19 from pycryptopp.cipher.aes import AES
22 from allmydata.immutable.checker import Checker
23 from allmydata.immutable.repairer import Repairer
24 from allmydata.immutable.downloader.node import DownloadNode, \
25 IDownloadStatusHandlingConsumer
26 from allmydata.immutable.downloader.status import DownloadStatus
28 class CiphertextFileNode:
29 def __init__(self, verifycap, storage_broker, secret_holder,
31 assert isinstance(verifycap, uri.CHKFileVerifierURI)
32 self._verifycap = verifycap
33 self._storage_broker = storage_broker
34 self._secret_holder = secret_holder
35 self._terminator = terminator
36 self._history = history
37 self._download_status = None
38 self._node = None # created lazily, on read()
40 def _maybe_create_download_node(self):
41 if not self._download_status:
42 ds = DownloadStatus(self._verifycap.storage_index,
45 self._history.add_download(ds)
46 self._download_status = ds
47 if self._node is None:
48 self._node = DownloadNode(self._verifycap, self._storage_broker,
51 self._history, self._download_status)
53 def read(self, consumer, offset=0, size=None):
54 """I am the main entry point, from which FileNode.read() can get
55 data. I feed the consumer with the desired range of ciphertext. I
56 return a Deferred that fires (with the consumer) when the read is
58 self._maybe_create_download_node()
59 return self._node.read(consumer, offset, size)
61 def get_segment(self, segnum):
62 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
63 Deferred that fires with (offset,data) when the desired segment is
64 available, and c is an object on which c.cancel() can be called to
65 disavow interest in the segment (after which 'd' will never fire).
67 You probably need to know the segment size before calling this,
68 unless you want the first few bytes of the file. If you ask for a
69 segment number which turns out to be too large, the Deferred will
70 errback with BadSegmentNumberError.
72 The Deferred fires with the offset of the first byte of the data
73 segment, so that you can call get_segment() before knowing the
74 segment size, and still know which data you received.
76 self._maybe_create_download_node()
77 return self._node.get_segment(segnum)
79 def get_segment_size(self):
80 # return a Deferred that fires with the file's real segment size
81 self._maybe_create_download_node()
82 return self._node.get_segsize()
84 def get_storage_index(self):
85 return self._verifycap.storage_index
86 def get_verify_cap(self):
87 return self._verifycap
89 return self._verifycap.size
91 def raise_error(self):
95 def check_and_repair(self, monitor, verify=False, add_lease=False):
96 verifycap = self._verifycap
97 storage_index = verifycap.storage_index
98 sb = self._storage_broker
99 servers = sb.get_connected_servers()
100 sh = self._secret_holder
102 c = Checker(verifycap=verifycap, servers=servers,
103 verify=verify, add_lease=add_lease, secret_holder=sh,
106 def _maybe_repair(cr):
107 crr = CheckAndRepairResults(storage_index)
108 crr.pre_repair_results = cr
110 crr.post_repair_results = cr
111 return defer.succeed(crr)
113 crr.repair_attempted = True
114 crr.repair_successful = False # until proven successful
115 def _gather_repair_results(ur):
116 assert IUploadResults.providedBy(ur), ur
117 # clone the cr (check results) to form the basis of the
118 # prr (post-repair results)
119 prr = CheckResults(cr.uri, cr.storage_index)
120 prr.data = copy.deepcopy(cr.data)
122 sm = prr.data['sharemap']
123 assert isinstance(sm, DictOfSets), sm
124 sm.update(ur.sharemap)
125 servers_responding = set(prr.data['servers-responding'])
126 servers_responding.union(ur.sharemap.iterkeys())
127 prr.data['servers-responding'] = list(servers_responding)
128 prr.data['count-shares-good'] = len(sm)
129 prr.data['count-good-share-hosts'] = len(sm)
130 is_healthy = bool(len(sm) >= verifycap.total_shares)
131 is_recoverable = bool(len(sm) >= verifycap.needed_shares)
132 prr.set_healthy(is_healthy)
133 prr.set_recoverable(is_recoverable)
134 crr.repair_successful = is_healthy
135 prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
137 crr.post_repair_results = prr
139 def _repair_error(f):
140 # as with mutable repair, I'm not sure if I want to pass
141 # through a failure or not. TODO
142 crr.repair_successful = False
143 crr.repair_failure = f
145 r = Repairer(self, storage_broker=sb, secret_holder=sh,
148 d.addCallbacks(_gather_repair_results, _repair_error)
151 d.addCallback(_maybe_repair)
154 def check(self, monitor, verify=False, add_lease=False):
155 verifycap = self._verifycap
156 sb = self._storage_broker
157 servers = sb.get_connected_servers()
158 sh = self._secret_holder
160 v = Checker(verifycap=verifycap, servers=servers,
161 verify=verify, add_lease=add_lease, secret_holder=sh,
165 class DecryptingConsumer:
166 """I sit between a CiphertextDownloader (which acts as a Producer) and
167 the real Consumer, decrypting everything that passes by. The real
168 Consumer sees the real Producer, but the Producer sees us instead of the
170 implements(IConsumer, IDownloadStatusHandlingConsumer)
172 def __init__(self, consumer, readkey, offset):
173 self._consumer = consumer
175 # TODO: pycryptopp CTR-mode needs random-access operations: I want
176 # either a=AES(readkey, offset) or better yet both of:
177 # a=AES(readkey, offset=0)
178 # a.process(ciphertext, offset=xyz)
179 # For now, we fake it with the existing iv= argument.
180 offset_big = offset // 16
181 offset_small = offset % 16
182 iv = binascii.unhexlify("%032x" % offset_big)
183 self._decryptor = AES(readkey, iv=iv)
184 self._decryptor.process("\x00"*offset_small)
186 def set_download_status_read_event(self, read_ev):
187 self._read_ev = read_ev
189 def registerProducer(self, producer, streaming):
190 # this passes through, so the real consumer can flow-control the real
191 # producer. Therefore we don't need to provide any IPushProducer
192 # methods. We implement all the IConsumer methods as pass-throughs,
193 # and only intercept write() to perform decryption.
194 self._consumer.registerProducer(producer, streaming)
195 def unregisterProducer(self):
196 self._consumer.unregisterProducer()
197 def write(self, ciphertext):
199 plaintext = self._decryptor.process(ciphertext)
201 elapsed = now() - started
202 self._read_ev.update(0, elapsed, 0)
203 self._consumer.write(plaintext)
205 class ImmutableFileNode:
206 implements(IImmutableFileNode)
208 # I wrap a CiphertextFileNode with a decryption key
209 def __init__(self, filecap, storage_broker, secret_holder, terminator,
211 assert isinstance(filecap, uri.CHKFileURI)
212 verifycap = filecap.get_verify_cap()
213 self._cnode = CiphertextFileNode(verifycap, storage_broker,
214 secret_holder, terminator, history)
215 assert isinstance(filecap, uri.CHKFileURI)
217 self._readkey = filecap.key
219 # TODO: I'm not sure about this.. what's the use case for node==node? If
220 # we keep it here, we should also put this on CiphertextFileNode
222 return self.u.__hash__()
223 def __eq__(self, other):
224 if isinstance(other, ImmutableFileNode):
225 return self.u.__eq__(other.u)
228 def __ne__(self, other):
229 if isinstance(other, ImmutableFileNode):
230 return self.u.__eq__(other.u)
234 def read(self, consumer, offset=0, size=None):
235 decryptor = DecryptingConsumer(consumer, self._readkey, offset)
236 d = self._cnode.read(decryptor, offset, size)
237 d.addCallback(lambda dc: consumer)
240 def raise_error(self):
243 def get_write_uri(self):
246 def get_readonly_uri(self):
247 return self.get_uri()
250 return self.u.to_string()
253 def get_readcap(self):
254 return self.u.get_readonly()
255 def get_verify_cap(self):
256 return self.u.get_verify_cap()
257 def get_repair_cap(self):
258 # CHK files can be repaired with just the verifycap
259 return self.u.get_verify_cap()
261 def get_storage_index(self):
262 return self.u.get_storage_index()
265 return self.u.get_size()
266 def get_current_size(self):
267 return defer.succeed(self.get_size())
269 def is_mutable(self):
272 def is_readonly(self):
275 def is_unknown(self):
278 def is_allowed_in_immutable_directory(self):
281 def check_and_repair(self, monitor, verify=False, add_lease=False):
282 return self._cnode.check_and_repair(monitor, verify, add_lease)
283 def check(self, monitor, verify=False, add_lease=False):
284 return self._cnode.check(monitor, verify, add_lease)
286 def get_best_readable_version(self):
288 Return an IReadable of the best version of this file. Since
289 immutable files can have only one version, we just return the
292 return defer.succeed(self)
295 def download_best_version(self):
297 Download the best version of this file, returning its contents
298 as a bytestring. Since there is only one version of an immutable
299 file, we download and return the contents of this file.
301 d = consumer.download_to_data(self)
304 # for an immutable file, download_to_data (specified in IReadable)
305 # is the same as download_best_version (specified in IFileNode). For
306 # mutable files, the difference is more meaningful, since they can
307 # have multiple versions.
308 download_to_data = download_best_version
311 # get_size() (IReadable), get_current_size() (IFilesystemNode), and
312 # get_size_of_best_version(IFileNode) are all the same for immutable
314 get_size_of_best_version = get_current_size