]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
Add comments and a caveat in webapi.rst indicating that
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import time
4 now = time.time
5 from zope.interface import implements
6 from twisted.internet import defer
7
8 from allmydata import uri
9 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata.util import consumer
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
15
16 # local imports
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode, \
20      IDownloadStatusHandlingConsumer
21 from allmydata.immutable.downloader.status import DownloadStatus
22
23 class CiphertextFileNode:
24     def __init__(self, verifycap, storage_broker, secret_holder,
25                  terminator, history):
26         assert isinstance(verifycap, uri.CHKFileVerifierURI)
27         self._verifycap = verifycap
28         self._storage_broker = storage_broker
29         self._secret_holder = secret_holder
30         self._terminator = terminator
31         self._history = history
32         self._download_status = None
33         self._node = None # created lazily, on read()
34
35     def _maybe_create_download_node(self):
36         if not self._download_status:
37             ds = DownloadStatus(self._verifycap.storage_index,
38                                 self._verifycap.size)
39             if self._history:
40                 self._history.add_download(ds)
41             self._download_status = ds
42         if self._node is None:
43             self._node = DownloadNode(self._verifycap, self._storage_broker,
44                                       self._secret_holder,
45                                       self._terminator,
46                                       self._history, self._download_status)
47
48     def read(self, consumer, offset=0, size=None):
49         """I am the main entry point, from which FileNode.read() can get
50         data. I feed the consumer with the desired range of ciphertext. I
51         return a Deferred that fires (with the consumer) when the read is
52         finished."""
53         self._maybe_create_download_node()
54         return self._node.read(consumer, offset, size)
55
56     def get_segment(self, segnum):
57         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
58         Deferred that fires with (offset,data) when the desired segment is
59         available, and c is an object on which c.cancel() can be called to
60         disavow interest in the segment (after which 'd' will never fire).
61
62         You probably need to know the segment size before calling this,
63         unless you want the first few bytes of the file. If you ask for a
64         segment number which turns out to be too large, the Deferred will
65         errback with BadSegmentNumberError.
66
67         The Deferred fires with the offset of the first byte of the data
68         segment, so that you can call get_segment() before knowing the
69         segment size, and still know which data you received.
70         """
71         self._maybe_create_download_node()
72         return self._node.get_segment(segnum)
73
74     def get_segment_size(self):
75         # return a Deferred that fires with the file's real segment size
76         self._maybe_create_download_node()
77         return self._node.get_segsize()
78
79     def get_storage_index(self):
80         return self._verifycap.storage_index
81     def get_verify_cap(self):
82         return self._verifycap
83     def get_size(self):
84         return self._verifycap.size
85
86     def raise_error(self):
87         pass
88
89     def is_mutable(self):
90         return False
91
92     def check_and_repair(self, monitor, verify=False, add_lease=False):
93         c = Checker(verifycap=self._verifycap,
94                     servers=self._storage_broker.get_connected_servers(),
95                     verify=verify, add_lease=add_lease,
96                     secret_holder=self._secret_holder,
97                     monitor=monitor)
98         d = c.start()
99         d.addCallback(self._maybe_repair, monitor)
100         return d
101
102     def _maybe_repair(self, cr, monitor):
103         crr = CheckAndRepairResults(self._verifycap.storage_index)
104         crr.pre_repair_results = cr
105         if cr.is_healthy():
106             crr.post_repair_results = cr
107             return defer.succeed(crr)
108
109         crr.repair_attempted = True
110         crr.repair_successful = False # until proven successful
111         def _repair_error(f):
112             # as with mutable repair, I'm not sure if I want to pass
113             # through a failure or not. TODO
114             crr.repair_successful = False
115             crr.repair_failure = f
116             return f
117         r = Repairer(self, storage_broker=self._storage_broker,
118                      secret_holder=self._secret_holder,
119                      monitor=monitor)
120         d = r.start()
121         d.addCallbacks(self._gather_repair_results, _repair_error,
122                        callbackArgs=(cr, crr,))
123         return d
124
125     def _gather_repair_results(self, ur, cr, crr):
126         assert IUploadResults.providedBy(ur), ur
127         # clone the cr (check results) to form the basis of the
128         # prr (post-repair results)
129
130         verifycap = self._verifycap
131         servers_responding = set(cr.get_servers_responding())
132         sm = DictOfSets()
133         assert isinstance(cr.get_sharemap(), DictOfSets)
134         for shnum, servers in cr.get_sharemap().items():
135             for server in servers:
136                 sm.add(shnum, server)
137         for shnum, servers in ur.get_sharemap().items():
138             for server in servers:
139                 sm.add(shnum, server)
140                 servers_responding.add(server)
141         servers_responding = sorted(servers_responding)
142
143         good_hosts = len(reduce(set.union, sm.values(), set()))
144         is_healthy = bool(len(sm) >= verifycap.total_shares)
145         is_recoverable = bool(len(sm) >= verifycap.needed_shares)
146
147         # TODO: this may be wrong, see ticket #1115 comment:27 and ticket #1784.
148         needs_rebalancing = bool(len(sm) >= verifycap.total_shares)
149
150         prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
151                            healthy=is_healthy, recoverable=is_recoverable,
152                            needs_rebalancing=needs_rebalancing,
153                            count_shares_needed=verifycap.needed_shares,
154                            count_shares_expected=verifycap.total_shares,
155                            count_shares_good=len(sm),
156                            count_good_share_hosts=good_hosts,
157                            count_recoverable_versions=int(is_recoverable),
158                            count_unrecoverable_versions=int(not is_recoverable),
159                            servers_responding=list(servers_responding),
160                            sharemap=sm,
161                            count_wrong_shares=0, # no such thing as wrong, for immutable
162                            list_corrupt_shares=cr.get_corrupt_shares(),
163                            count_corrupt_shares=len(cr.get_corrupt_shares()),
164                            list_incompatible_shares=cr.get_incompatible_shares(),
165                            count_incompatible_shares=len(cr.get_incompatible_shares()),
166                            summary="",
167                            report=[],
168                            share_problems=[],
169                            servermap=None)
170         crr.repair_successful = is_healthy
171         crr.post_repair_results = prr
172         return crr
173
174     def check(self, monitor, verify=False, add_lease=False):
175         verifycap = self._verifycap
176         sb = self._storage_broker
177         servers = sb.get_connected_servers()
178         sh = self._secret_holder
179
180         v = Checker(verifycap=verifycap, servers=servers,
181                     verify=verify, add_lease=add_lease, secret_holder=sh,
182                     monitor=monitor)
183         return v.start()
184
185 class DecryptingConsumer:
186     """I sit between a CiphertextDownloader (which acts as a Producer) and
187     the real Consumer, decrypting everything that passes by. The real
188     Consumer sees the real Producer, but the Producer sees us instead of the
189     real consumer."""
190     implements(IConsumer, IDownloadStatusHandlingConsumer)
191
192     def __init__(self, consumer, readkey, offset):
193         self._consumer = consumer
194         self._read_ev = None
195         self._download_status = None
196         # TODO: pycryptopp CTR-mode needs random-access operations: I want
197         # either a=AES(readkey, offset) or better yet both of:
198         #  a=AES(readkey, offset=0)
199         #  a.process(ciphertext, offset=xyz)
200         # For now, we fake it with the existing iv= argument.
201         offset_big = offset // 16
202         offset_small = offset % 16
203         iv = binascii.unhexlify("%032x" % offset_big)
204         self._decryptor = AES(readkey, iv=iv)
205         self._decryptor.process("\x00"*offset_small)
206
207     def set_download_status_read_event(self, read_ev):
208         self._read_ev = read_ev
209     def set_download_status(self, ds):
210         self._download_status = ds
211
212     def registerProducer(self, producer, streaming):
213         # this passes through, so the real consumer can flow-control the real
214         # producer. Therefore we don't need to provide any IPushProducer
215         # methods. We implement all the IConsumer methods as pass-throughs,
216         # and only intercept write() to perform decryption.
217         self._consumer.registerProducer(producer, streaming)
218     def unregisterProducer(self):
219         self._consumer.unregisterProducer()
220     def write(self, ciphertext):
221         started = now()
222         plaintext = self._decryptor.process(ciphertext)
223         if self._read_ev:
224             elapsed = now() - started
225             self._read_ev.update(0, elapsed, 0)
226         if self._download_status:
227             self._download_status.add_misc_event("AES", started, now())
228         self._consumer.write(plaintext)
229
230 class ImmutableFileNode:
231     implements(IImmutableFileNode)
232
233     # I wrap a CiphertextFileNode with a decryption key
234     def __init__(self, filecap, storage_broker, secret_holder, terminator,
235                  history):
236         assert isinstance(filecap, uri.CHKFileURI)
237         verifycap = filecap.get_verify_cap()
238         self._cnode = CiphertextFileNode(verifycap, storage_broker,
239                                          secret_holder, terminator, history)
240         assert isinstance(filecap, uri.CHKFileURI)
241         self.u = filecap
242         self._readkey = filecap.key
243
244     # TODO: I'm not sure about this.. what's the use case for node==node? If
245     # we keep it here, we should also put this on CiphertextFileNode
246     def __hash__(self):
247         return self.u.__hash__()
248     def __eq__(self, other):
249         if isinstance(other, ImmutableFileNode):
250             return self.u.__eq__(other.u)
251         else:
252             return False
253     def __ne__(self, other):
254         if isinstance(other, ImmutableFileNode):
255             return self.u.__eq__(other.u)
256         else:
257             return True
258
259     def read(self, consumer, offset=0, size=None):
260         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
261         d = self._cnode.read(decryptor, offset, size)
262         d.addCallback(lambda dc: consumer)
263         return d
264
265     def raise_error(self):
266         pass
267
268     def get_write_uri(self):
269         return None
270
271     def get_readonly_uri(self):
272         return self.get_uri()
273
274     def get_uri(self):
275         return self.u.to_string()
276     def get_cap(self):
277         return self.u
278     def get_readcap(self):
279         return self.u.get_readonly()
280     def get_verify_cap(self):
281         return self.u.get_verify_cap()
282     def get_repair_cap(self):
283         # CHK files can be repaired with just the verifycap
284         return self.u.get_verify_cap()
285
286     def get_storage_index(self):
287         return self.u.get_storage_index()
288
289     def get_size(self):
290         return self.u.get_size()
291     def get_current_size(self):
292         return defer.succeed(self.get_size())
293
294     def is_mutable(self):
295         return False
296
297     def is_readonly(self):
298         return True
299
300     def is_unknown(self):
301         return False
302
303     def is_allowed_in_immutable_directory(self):
304         return True
305
306     def check_and_repair(self, monitor, verify=False, add_lease=False):
307         return self._cnode.check_and_repair(monitor, verify, add_lease)
308     def check(self, monitor, verify=False, add_lease=False):
309         return self._cnode.check(monitor, verify, add_lease)
310
311     def get_best_readable_version(self):
312         """
313         Return an IReadable of the best version of this file. Since
314         immutable files can have only one version, we just return the
315         current filenode.
316         """
317         return defer.succeed(self)
318
319
320     def download_best_version(self):
321         """
322         Download the best version of this file, returning its contents
323         as a bytestring. Since there is only one version of an immutable
324         file, we download and return the contents of this file.
325         """
326         d = consumer.download_to_data(self)
327         return d
328
329     # for an immutable file, download_to_data (specified in IReadable)
330     # is the same as download_best_version (specified in IFileNode). For
331     # mutable files, the difference is more meaningful, since they can
332     # have multiple versions.
333     download_to_data = download_best_version
334
335
336     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
337     # get_size_of_best_version(IFileNode) are all the same for immutable
338     # files.
339     get_size_of_best_version = get_current_size