]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
79e282d90e232a2645b56e7b14cf1a765fd463e3
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import time
4 now = time.time
5 from zope.interface import implements
6 from twisted.internet import defer
7
8 from allmydata import uri
9 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata.util import consumer
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
15
16 # local imports
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode, \
20      IDownloadStatusHandlingConsumer
21 from allmydata.immutable.downloader.status import DownloadStatus
22
23 class CiphertextFileNode:
24     def __init__(self, verifycap, storage_broker, secret_holder,
25                  terminator, history):
26         assert isinstance(verifycap, uri.CHKFileVerifierURI)
27         self._verifycap = verifycap
28         self._storage_broker = storage_broker
29         self._secret_holder = secret_holder
30         self._terminator = terminator
31         self._history = history
32         self._download_status = None
33         self._node = None # created lazily, on read()
34
35     def _maybe_create_download_node(self):
36         if not self._download_status:
37             ds = DownloadStatus(self._verifycap.storage_index,
38                                 self._verifycap.size)
39             if self._history:
40                 self._history.add_download(ds)
41             self._download_status = ds
42         if self._node is None:
43             self._node = DownloadNode(self._verifycap, self._storage_broker,
44                                       self._secret_holder,
45                                       self._terminator,
46                                       self._history, self._download_status)
47
48     def read(self, consumer, offset=0, size=None):
49         """I am the main entry point, from which FileNode.read() can get
50         data. I feed the consumer with the desired range of ciphertext. I
51         return a Deferred that fires (with the consumer) when the read is
52         finished."""
53         self._maybe_create_download_node()
54         return self._node.read(consumer, offset, size)
55
56     def get_segment(self, segnum):
57         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
58         Deferred that fires with (offset,data) when the desired segment is
59         available, and c is an object on which c.cancel() can be called to
60         disavow interest in the segment (after which 'd' will never fire).
61
62         You probably need to know the segment size before calling this,
63         unless you want the first few bytes of the file. If you ask for a
64         segment number which turns out to be too large, the Deferred will
65         errback with BadSegmentNumberError.
66
67         The Deferred fires with the offset of the first byte of the data
68         segment, so that you can call get_segment() before knowing the
69         segment size, and still know which data you received.
70         """
71         self._maybe_create_download_node()
72         return self._node.get_segment(segnum)
73
74     def get_segment_size(self):
75         # return a Deferred that fires with the file's real segment size
76         self._maybe_create_download_node()
77         return self._node.get_segsize()
78
79     def get_storage_index(self):
80         return self._verifycap.storage_index
81     def get_verify_cap(self):
82         return self._verifycap
83     def get_size(self):
84         return self._verifycap.size
85
86     def raise_error(self):
87         pass
88
89     def is_mutable(self):
90         return False
91
92     def check_and_repair(self, monitor, verify=False, add_lease=False):
93         c = Checker(verifycap=self._verifycap,
94                     servers=self._storage_broker.get_connected_servers(),
95                     verify=verify, add_lease=add_lease,
96                     secret_holder=self._secret_holder,
97                     monitor=monitor)
98         d = c.start()
99         d.addCallback(self._maybe_repair, monitor)
100         return d
101
102     def _maybe_repair(self, cr, monitor):
103         crr = CheckAndRepairResults(self._verifycap.storage_index)
104         crr.pre_repair_results = cr
105         if cr.is_healthy():
106             crr.post_repair_results = cr
107             return defer.succeed(crr)
108
109         crr.repair_attempted = True
110         crr.repair_successful = False # until proven successful
111         def _repair_error(f):
112             # as with mutable repair, I'm not sure if I want to pass
113             # through a failure or not. TODO
114             crr.repair_successful = False
115             crr.repair_failure = f
116             return f
117         r = Repairer(self, storage_broker=self._storage_broker,
118                      secret_holder=self._secret_holder,
119                      monitor=monitor)
120         d = r.start()
121         d.addCallbacks(self._gather_repair_results, _repair_error,
122                        callbackArgs=(cr, crr,))
123         return d
124
125     def _gather_repair_results(self, ur, cr, crr):
126         assert IUploadResults.providedBy(ur), ur
127         # clone the cr (check results) to form the basis of the
128         # prr (post-repair results)
129
130         verifycap = self._verifycap
131         servers_responding = set(cr.get_servers_responding())
132         sm = DictOfSets()
133         assert isinstance(cr.get_sharemap(), DictOfSets)
134         for shnum, servers in cr.get_sharemap().items():
135             for server in servers:
136                 sm.add(shnum, server)
137         for shnum, servers in ur.get_sharemap().items():
138             for server in servers:
139                 sm.add(shnum, server)
140                 servers_responding.add(server)
141         servers_responding = sorted(servers_responding)
142
143         good_hosts = len(reduce(set.union, sm.values(), set()))
144         is_healthy = bool(len(sm) >= verifycap.total_shares)
145         is_recoverable = bool(len(sm) >= verifycap.needed_shares)
146         needs_rebalancing = bool(len(sm) >= verifycap.total_shares)
147         prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
148                            healthy=is_healthy, recoverable=is_recoverable,
149                            needs_rebalancing=needs_rebalancing,
150                            count_shares_needed=verifycap.needed_shares,
151                            count_shares_expected=verifycap.total_shares,
152                            count_shares_good=len(sm),
153                            count_good_share_hosts=good_hosts,
154                            count_recoverable_versions=int(is_recoverable),
155                            count_unrecoverable_versions=int(not is_recoverable),
156                            servers_responding=list(servers_responding),
157                            sharemap=sm,
158                            count_wrong_shares=0, # no such thing as wrong, for immutable
159                            list_corrupt_shares=cr.get_corrupt_shares(),
160                            count_corrupt_shares=len(cr.get_corrupt_shares()),
161                            list_incompatible_shares=cr.get_incompatible_shares(),
162                            count_incompatible_shares=len(cr.get_incompatible_shares()),
163                            summary="",
164                            report=[],
165                            share_problems=[],
166                            servermap=None)
167         crr.repair_successful = is_healthy
168         crr.post_repair_results = prr
169         return crr
170
171     def check(self, monitor, verify=False, add_lease=False):
172         verifycap = self._verifycap
173         sb = self._storage_broker
174         servers = sb.get_connected_servers()
175         sh = self._secret_holder
176
177         v = Checker(verifycap=verifycap, servers=servers,
178                     verify=verify, add_lease=add_lease, secret_holder=sh,
179                     monitor=monitor)
180         return v.start()
181
182 class DecryptingConsumer:
183     """I sit between a CiphertextDownloader (which acts as a Producer) and
184     the real Consumer, decrypting everything that passes by. The real
185     Consumer sees the real Producer, but the Producer sees us instead of the
186     real consumer."""
187     implements(IConsumer, IDownloadStatusHandlingConsumer)
188
189     def __init__(self, consumer, readkey, offset):
190         self._consumer = consumer
191         self._read_ev = None
192         self._download_status = None
193         # TODO: pycryptopp CTR-mode needs random-access operations: I want
194         # either a=AES(readkey, offset) or better yet both of:
195         #  a=AES(readkey, offset=0)
196         #  a.process(ciphertext, offset=xyz)
197         # For now, we fake it with the existing iv= argument.
198         offset_big = offset // 16
199         offset_small = offset % 16
200         iv = binascii.unhexlify("%032x" % offset_big)
201         self._decryptor = AES(readkey, iv=iv)
202         self._decryptor.process("\x00"*offset_small)
203
204     def set_download_status_read_event(self, read_ev):
205         self._read_ev = read_ev
206     def set_download_status(self, ds):
207         self._download_status = ds
208
209     def registerProducer(self, producer, streaming):
210         # this passes through, so the real consumer can flow-control the real
211         # producer. Therefore we don't need to provide any IPushProducer
212         # methods. We implement all the IConsumer methods as pass-throughs,
213         # and only intercept write() to perform decryption.
214         self._consumer.registerProducer(producer, streaming)
215     def unregisterProducer(self):
216         self._consumer.unregisterProducer()
217     def write(self, ciphertext):
218         started = now()
219         plaintext = self._decryptor.process(ciphertext)
220         if self._read_ev:
221             elapsed = now() - started
222             self._read_ev.update(0, elapsed, 0)
223         if self._download_status:
224             self._download_status.add_misc_event("AES", started, now())
225         self._consumer.write(plaintext)
226
227 class ImmutableFileNode:
228     implements(IImmutableFileNode)
229
230     # I wrap a CiphertextFileNode with a decryption key
231     def __init__(self, filecap, storage_broker, secret_holder, terminator,
232                  history):
233         assert isinstance(filecap, uri.CHKFileURI)
234         verifycap = filecap.get_verify_cap()
235         self._cnode = CiphertextFileNode(verifycap, storage_broker,
236                                          secret_holder, terminator, history)
237         assert isinstance(filecap, uri.CHKFileURI)
238         self.u = filecap
239         self._readkey = filecap.key
240
241     # TODO: I'm not sure about this.. what's the use case for node==node? If
242     # we keep it here, we should also put this on CiphertextFileNode
243     def __hash__(self):
244         return self.u.__hash__()
245     def __eq__(self, other):
246         if isinstance(other, ImmutableFileNode):
247             return self.u.__eq__(other.u)
248         else:
249             return False
250     def __ne__(self, other):
251         if isinstance(other, ImmutableFileNode):
252             return self.u.__eq__(other.u)
253         else:
254             return True
255
256     def read(self, consumer, offset=0, size=None):
257         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
258         d = self._cnode.read(decryptor, offset, size)
259         d.addCallback(lambda dc: consumer)
260         return d
261
262     def raise_error(self):
263         pass
264
265     def get_write_uri(self):
266         return None
267
268     def get_readonly_uri(self):
269         return self.get_uri()
270
271     def get_uri(self):
272         return self.u.to_string()
273     def get_cap(self):
274         return self.u
275     def get_readcap(self):
276         return self.u.get_readonly()
277     def get_verify_cap(self):
278         return self.u.get_verify_cap()
279     def get_repair_cap(self):
280         # CHK files can be repaired with just the verifycap
281         return self.u.get_verify_cap()
282
283     def get_storage_index(self):
284         return self.u.get_storage_index()
285
286     def get_size(self):
287         return self.u.get_size()
288     def get_current_size(self):
289         return defer.succeed(self.get_size())
290
291     def is_mutable(self):
292         return False
293
294     def is_readonly(self):
295         return True
296
297     def is_unknown(self):
298         return False
299
300     def is_allowed_in_immutable_directory(self):
301         return True
302
303     def check_and_repair(self, monitor, verify=False, add_lease=False):
304         return self._cnode.check_and_repair(monitor, verify, add_lease)
305     def check(self, monitor, verify=False, add_lease=False):
306         return self._cnode.check(monitor, verify, add_lease)
307
308     def get_best_readable_version(self):
309         """
310         Return an IReadable of the best version of this file. Since
311         immutable files can have only one version, we just return the
312         current filenode.
313         """
314         return defer.succeed(self)
315
316
317     def download_best_version(self):
318         """
319         Download the best version of this file, returning its contents
320         as a bytestring. Since there is only one version of an immutable
321         file, we download and return the contents of this file.
322         """
323         d = consumer.download_to_data(self)
324         return d
325
326     # for an immutable file, download_to_data (specified in IReadable)
327     # is the same as download_best_version (specified in IFileNode). For
328     # mutable files, the difference is more meaningful, since they can
329     # have multiple versions.
330     download_to_data = download_best_version
331
332
333     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
334     # get_size_of_best_version(IFileNode) are all the same for immutable
335     # files.
336     get_size_of_best_version = get_current_size