]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
use the new CheckResult getters almost everywhere
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import time
4 now = time.time
5 from zope.interface import implements
6 from twisted.internet import defer
7
8 from allmydata import uri
9 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata.util import consumer
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
15
16 # local imports
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode, \
20      IDownloadStatusHandlingConsumer
21 from allmydata.immutable.downloader.status import DownloadStatus
22
23 class CiphertextFileNode:
24     def __init__(self, verifycap, storage_broker, secret_holder,
25                  terminator, history):
26         assert isinstance(verifycap, uri.CHKFileVerifierURI)
27         self._verifycap = verifycap
28         self._storage_broker = storage_broker
29         self._secret_holder = secret_holder
30         self._terminator = terminator
31         self._history = history
32         self._download_status = None
33         self._node = None # created lazily, on read()
34
35     def _maybe_create_download_node(self):
36         if not self._download_status:
37             ds = DownloadStatus(self._verifycap.storage_index,
38                                 self._verifycap.size)
39             if self._history:
40                 self._history.add_download(ds)
41             self._download_status = ds
42         if self._node is None:
43             self._node = DownloadNode(self._verifycap, self._storage_broker,
44                                       self._secret_holder,
45                                       self._terminator,
46                                       self._history, self._download_status)
47
48     def read(self, consumer, offset=0, size=None):
49         """I am the main entry point, from which FileNode.read() can get
50         data. I feed the consumer with the desired range of ciphertext. I
51         return a Deferred that fires (with the consumer) when the read is
52         finished."""
53         self._maybe_create_download_node()
54         return self._node.read(consumer, offset, size)
55
56     def get_segment(self, segnum):
57         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
58         Deferred that fires with (offset,data) when the desired segment is
59         available, and c is an object on which c.cancel() can be called to
60         disavow interest in the segment (after which 'd' will never fire).
61
62         You probably need to know the segment size before calling this,
63         unless you want the first few bytes of the file. If you ask for a
64         segment number which turns out to be too large, the Deferred will
65         errback with BadSegmentNumberError.
66
67         The Deferred fires with the offset of the first byte of the data
68         segment, so that you can call get_segment() before knowing the
69         segment size, and still know which data you received.
70         """
71         self._maybe_create_download_node()
72         return self._node.get_segment(segnum)
73
74     def get_segment_size(self):
75         # return a Deferred that fires with the file's real segment size
76         self._maybe_create_download_node()
77         return self._node.get_segsize()
78
79     def get_storage_index(self):
80         return self._verifycap.storage_index
81     def get_verify_cap(self):
82         return self._verifycap
83     def get_size(self):
84         return self._verifycap.size
85
86     def raise_error(self):
87         pass
88
89
90     def check_and_repair(self, monitor, verify=False, add_lease=False):
91         c = Checker(verifycap=self._verifycap,
92                     servers=self._storage_broker.get_connected_servers(),
93                     verify=verify, add_lease=add_lease,
94                     secret_holder=self._secret_holder,
95                     monitor=monitor)
96         d = c.start()
97         d.addCallback(self._maybe_repair, monitor)
98         return d
99
100     def _maybe_repair(self, cr, monitor):
101         crr = CheckAndRepairResults(self._verifycap.storage_index)
102         crr.pre_repair_results = cr
103         if cr.is_healthy():
104             crr.post_repair_results = cr
105             return defer.succeed(crr)
106
107         crr.repair_attempted = True
108         crr.repair_successful = False # until proven successful
109         def _repair_error(f):
110             # as with mutable repair, I'm not sure if I want to pass
111             # through a failure or not. TODO
112             crr.repair_successful = False
113             crr.repair_failure = f
114             return f
115         r = Repairer(self, storage_broker=self._storage_broker,
116                      secret_holder=self._secret_holder,
117                      monitor=monitor)
118         d = r.start()
119         d.addCallbacks(self._gather_repair_results, _repair_error,
120                        callbackArgs=(cr, crr,))
121         return d
122
123     def _gather_repair_results(self, ur, cr, crr):
124         assert IUploadResults.providedBy(ur), ur
125         # clone the cr (check results) to form the basis of the
126         # prr (post-repair results)
127         prr = CheckResults(cr.uri, cr.storage_index)
128
129         verifycap = self._verifycap
130         servers_responding = set(cr.get_servers_responding())
131         sm = DictOfSets()
132         assert isinstance(cr.get_sharemap(), DictOfSets)
133         for shnum, serverids in cr.get_sharemap().items():
134             for serverid in serverids:
135                 sm.add(shnum, serverid)
136         for shnum, servers in ur.get_sharemap().items():
137             for s in servers:
138                 sm.add(shnum, s.get_serverid())
139                 servers_responding.add(s.get_serverid())
140         servers_responding = sorted(servers_responding)
141
142         good_hosts = len(reduce(set.union, sm.values(), set()))
143         is_healthy = bool(len(sm) >= verifycap.total_shares)
144         is_recoverable = bool(len(sm) >= verifycap.needed_shares)
145         prr.set_data(
146             count_shares_needed=verifycap.needed_shares,
147             count_shares_expected=verifycap.total_shares,
148             count_shares_good=len(sm),
149             count_good_share_hosts=good_hosts,
150             count_recoverable_versions=int(is_recoverable),
151             count_unrecoverable_versions=int(not is_recoverable),
152             servers_responding=list(servers_responding),
153             sharemap=sm,
154             count_wrong_shares=0, # no such thing as wrong, for immutable
155             list_corrupt_shares=cr.get_corrupt_shares(),
156             count_corrupt_shares=len(cr.get_corrupt_shares()),
157             list_incompatible_shares=cr.get_incompatible_shares(),
158             count_incompatible_shares=len(cr.get_incompatible_shares()),
159             )
160         prr.set_healthy(is_healthy)
161         prr.set_recoverable(is_recoverable)
162         crr.repair_successful = is_healthy
163         prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
164
165         crr.post_repair_results = prr
166         return crr
167
168     def check(self, monitor, verify=False, add_lease=False):
169         verifycap = self._verifycap
170         sb = self._storage_broker
171         servers = sb.get_connected_servers()
172         sh = self._secret_holder
173
174         v = Checker(verifycap=verifycap, servers=servers,
175                     verify=verify, add_lease=add_lease, secret_holder=sh,
176                     monitor=monitor)
177         return v.start()
178
179 class DecryptingConsumer:
180     """I sit between a CiphertextDownloader (which acts as a Producer) and
181     the real Consumer, decrypting everything that passes by. The real
182     Consumer sees the real Producer, but the Producer sees us instead of the
183     real consumer."""
184     implements(IConsumer, IDownloadStatusHandlingConsumer)
185
186     def __init__(self, consumer, readkey, offset):
187         self._consumer = consumer
188         self._read_ev = None
189         self._download_status = None
190         # TODO: pycryptopp CTR-mode needs random-access operations: I want
191         # either a=AES(readkey, offset) or better yet both of:
192         #  a=AES(readkey, offset=0)
193         #  a.process(ciphertext, offset=xyz)
194         # For now, we fake it with the existing iv= argument.
195         offset_big = offset // 16
196         offset_small = offset % 16
197         iv = binascii.unhexlify("%032x" % offset_big)
198         self._decryptor = AES(readkey, iv=iv)
199         self._decryptor.process("\x00"*offset_small)
200
201     def set_download_status_read_event(self, read_ev):
202         self._read_ev = read_ev
203     def set_download_status(self, ds):
204         self._download_status = ds
205
206     def registerProducer(self, producer, streaming):
207         # this passes through, so the real consumer can flow-control the real
208         # producer. Therefore we don't need to provide any IPushProducer
209         # methods. We implement all the IConsumer methods as pass-throughs,
210         # and only intercept write() to perform decryption.
211         self._consumer.registerProducer(producer, streaming)
212     def unregisterProducer(self):
213         self._consumer.unregisterProducer()
214     def write(self, ciphertext):
215         started = now()
216         plaintext = self._decryptor.process(ciphertext)
217         if self._read_ev:
218             elapsed = now() - started
219             self._read_ev.update(0, elapsed, 0)
220         if self._download_status:
221             self._download_status.add_misc_event("AES", started, now())
222         self._consumer.write(plaintext)
223
224 class ImmutableFileNode:
225     implements(IImmutableFileNode)
226
227     # I wrap a CiphertextFileNode with a decryption key
228     def __init__(self, filecap, storage_broker, secret_holder, terminator,
229                  history):
230         assert isinstance(filecap, uri.CHKFileURI)
231         verifycap = filecap.get_verify_cap()
232         self._cnode = CiphertextFileNode(verifycap, storage_broker,
233                                          secret_holder, terminator, history)
234         assert isinstance(filecap, uri.CHKFileURI)
235         self.u = filecap
236         self._readkey = filecap.key
237
238     # TODO: I'm not sure about this.. what's the use case for node==node? If
239     # we keep it here, we should also put this on CiphertextFileNode
240     def __hash__(self):
241         return self.u.__hash__()
242     def __eq__(self, other):
243         if isinstance(other, ImmutableFileNode):
244             return self.u.__eq__(other.u)
245         else:
246             return False
247     def __ne__(self, other):
248         if isinstance(other, ImmutableFileNode):
249             return self.u.__eq__(other.u)
250         else:
251             return True
252
253     def read(self, consumer, offset=0, size=None):
254         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
255         d = self._cnode.read(decryptor, offset, size)
256         d.addCallback(lambda dc: consumer)
257         return d
258
259     def raise_error(self):
260         pass
261
262     def get_write_uri(self):
263         return None
264
265     def get_readonly_uri(self):
266         return self.get_uri()
267
268     def get_uri(self):
269         return self.u.to_string()
270     def get_cap(self):
271         return self.u
272     def get_readcap(self):
273         return self.u.get_readonly()
274     def get_verify_cap(self):
275         return self.u.get_verify_cap()
276     def get_repair_cap(self):
277         # CHK files can be repaired with just the verifycap
278         return self.u.get_verify_cap()
279
280     def get_storage_index(self):
281         return self.u.get_storage_index()
282
283     def get_size(self):
284         return self.u.get_size()
285     def get_current_size(self):
286         return defer.succeed(self.get_size())
287
288     def is_mutable(self):
289         return False
290
291     def is_readonly(self):
292         return True
293
294     def is_unknown(self):
295         return False
296
297     def is_allowed_in_immutable_directory(self):
298         return True
299
300     def check_and_repair(self, monitor, verify=False, add_lease=False):
301         return self._cnode.check_and_repair(monitor, verify, add_lease)
302     def check(self, monitor, verify=False, add_lease=False):
303         return self._cnode.check(monitor, verify, add_lease)
304
305     def get_best_readable_version(self):
306         """
307         Return an IReadable of the best version of this file. Since
308         immutable files can have only one version, we just return the
309         current filenode.
310         """
311         return defer.succeed(self)
312
313
314     def download_best_version(self):
315         """
316         Download the best version of this file, returning its contents
317         as a bytestring. Since there is only one version of an immutable
318         file, we download and return the contents of this file.
319         """
320         d = consumer.download_to_data(self)
321         return d
322
323     # for an immutable file, download_to_data (specified in IReadable)
324     # is the same as download_best_version (specified in IFileNode). For
325     # mutable files, the difference is more meaningful, since they can
326     # have multiple versions.
327     download_to_data = download_best_version
328
329
330     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
331     # get_size_of_best_version(IFileNode) are all the same for immutable
332     # files.
333     get_size_of_best_version = get_current_size