]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
7def22c824186ebb1706bcfa65ea021dbe268dfb
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import time
4 now = time.time
5 from zope.interface import implements
6 from twisted.internet import defer
7
8 from allmydata import uri
9 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata.util import consumer
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
15
16 # local imports
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode, \
20      IDownloadStatusHandlingConsumer
21 from allmydata.immutable.downloader.status import DownloadStatus
22
23 class CiphertextFileNode:
24     def __init__(self, verifycap, storage_broker, secret_holder,
25                  terminator, history):
26         assert isinstance(verifycap, uri.CHKFileVerifierURI)
27         self._verifycap = verifycap
28         self._storage_broker = storage_broker
29         self._secret_holder = secret_holder
30         self._terminator = terminator
31         self._history = history
32         self._download_status = None
33         self._node = None # created lazily, on read()
34
35     def _maybe_create_download_node(self):
36         if not self._download_status:
37             ds = DownloadStatus(self._verifycap.storage_index,
38                                 self._verifycap.size)
39             if self._history:
40                 self._history.add_download(ds)
41             self._download_status = ds
42         if self._node is None:
43             self._node = DownloadNode(self._verifycap, self._storage_broker,
44                                       self._secret_holder,
45                                       self._terminator,
46                                       self._history, self._download_status)
47
48     def read(self, consumer, offset=0, size=None):
49         """I am the main entry point, from which FileNode.read() can get
50         data. I feed the consumer with the desired range of ciphertext. I
51         return a Deferred that fires (with the consumer) when the read is
52         finished."""
53         self._maybe_create_download_node()
54         return self._node.read(consumer, offset, size)
55
56     def get_segment(self, segnum):
57         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
58         Deferred that fires with (offset,data) when the desired segment is
59         available, and c is an object on which c.cancel() can be called to
60         disavow interest in the segment (after which 'd' will never fire).
61
62         You probably need to know the segment size before calling this,
63         unless you want the first few bytes of the file. If you ask for a
64         segment number which turns out to be too large, the Deferred will
65         errback with BadSegmentNumberError.
66
67         The Deferred fires with the offset of the first byte of the data
68         segment, so that you can call get_segment() before knowing the
69         segment size, and still know which data you received.
70         """
71         self._maybe_create_download_node()
72         return self._node.get_segment(segnum)
73
74     def get_segment_size(self):
75         # return a Deferred that fires with the file's real segment size
76         self._maybe_create_download_node()
77         return self._node.get_segsize()
78
79     def get_storage_index(self):
80         return self._verifycap.storage_index
81     def get_verify_cap(self):
82         return self._verifycap
83     def get_size(self):
84         return self._verifycap.size
85
86     def raise_error(self):
87         pass
88
89
90     def check_and_repair(self, monitor, verify=False, add_lease=False):
91         c = Checker(verifycap=self._verifycap,
92                     servers=self._storage_broker.get_connected_servers(),
93                     verify=verify, add_lease=add_lease,
94                     secret_holder=self._secret_holder,
95                     monitor=monitor)
96         d = c.start()
97         d.addCallback(self._maybe_repair, monitor)
98         return d
99
100     def _maybe_repair(self, cr, monitor):
101         crr = CheckAndRepairResults(self._verifycap.storage_index)
102         crr.pre_repair_results = cr
103         if cr.is_healthy():
104             crr.post_repair_results = cr
105             return defer.succeed(crr)
106
107         crr.repair_attempted = True
108         crr.repair_successful = False # until proven successful
109         def _repair_error(f):
110             # as with mutable repair, I'm not sure if I want to pass
111             # through a failure or not. TODO
112             crr.repair_successful = False
113             crr.repair_failure = f
114             return f
115         r = Repairer(self, storage_broker=self._storage_broker,
116                      secret_holder=self._secret_holder,
117                      monitor=monitor)
118         d = r.start()
119         d.addCallbacks(self._gather_repair_results, _repair_error,
120                        callbackArgs=(cr, crr,))
121         return d
122
123     def _gather_repair_results(self, ur, cr, crr):
124         assert IUploadResults.providedBy(ur), ur
125         # clone the cr (check results) to form the basis of the
126         # prr (post-repair results)
127
128         verifycap = self._verifycap
129         servers_responding = set(cr.get_servers_responding())
130         sm = DictOfSets()
131         assert isinstance(cr.get_sharemap(), DictOfSets)
132         for shnum, servers in cr.get_sharemap().items():
133             for server in servers:
134                 sm.add(shnum, server)
135         for shnum, servers in ur.get_sharemap().items():
136             for server in servers:
137                 sm.add(shnum, server)
138                 servers_responding.add(server)
139         servers_responding = sorted(servers_responding)
140
141         good_hosts = len(reduce(set.union, sm.values(), set()))
142         is_healthy = bool(len(sm) >= verifycap.total_shares)
143         is_recoverable = bool(len(sm) >= verifycap.needed_shares)
144         needs_rebalancing = bool(len(sm) >= verifycap.total_shares)
145         prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
146                            healthy=is_healthy, recoverable=is_recoverable,
147                            needs_rebalancing=needs_rebalancing,
148                            count_shares_needed=verifycap.needed_shares,
149                            count_shares_expected=verifycap.total_shares,
150                            count_shares_good=len(sm),
151                            count_good_share_hosts=good_hosts,
152                            count_recoverable_versions=int(is_recoverable),
153                            count_unrecoverable_versions=int(not is_recoverable),
154                            servers_responding=list(servers_responding),
155                            sharemap=sm,
156                            count_wrong_shares=0, # no such thing as wrong, for immutable
157                            list_corrupt_shares=cr.get_corrupt_shares(),
158                            count_corrupt_shares=len(cr.get_corrupt_shares()),
159                            list_incompatible_shares=cr.get_incompatible_shares(),
160                            count_incompatible_shares=len(cr.get_incompatible_shares()),
161                            summary="",
162                            report=[],
163                            share_problems=[],
164                            servermap=None)
165         crr.repair_successful = is_healthy
166         crr.post_repair_results = prr
167         return crr
168
169     def check(self, monitor, verify=False, add_lease=False):
170         verifycap = self._verifycap
171         sb = self._storage_broker
172         servers = sb.get_connected_servers()
173         sh = self._secret_holder
174
175         v = Checker(verifycap=verifycap, servers=servers,
176                     verify=verify, add_lease=add_lease, secret_holder=sh,
177                     monitor=monitor)
178         return v.start()
179
180 class DecryptingConsumer:
181     """I sit between a CiphertextDownloader (which acts as a Producer) and
182     the real Consumer, decrypting everything that passes by. The real
183     Consumer sees the real Producer, but the Producer sees us instead of the
184     real consumer."""
185     implements(IConsumer, IDownloadStatusHandlingConsumer)
186
187     def __init__(self, consumer, readkey, offset):
188         self._consumer = consumer
189         self._read_ev = None
190         self._download_status = None
191         # TODO: pycryptopp CTR-mode needs random-access operations: I want
192         # either a=AES(readkey, offset) or better yet both of:
193         #  a=AES(readkey, offset=0)
194         #  a.process(ciphertext, offset=xyz)
195         # For now, we fake it with the existing iv= argument.
196         offset_big = offset // 16
197         offset_small = offset % 16
198         iv = binascii.unhexlify("%032x" % offset_big)
199         self._decryptor = AES(readkey, iv=iv)
200         self._decryptor.process("\x00"*offset_small)
201
202     def set_download_status_read_event(self, read_ev):
203         self._read_ev = read_ev
204     def set_download_status(self, ds):
205         self._download_status = ds
206
207     def registerProducer(self, producer, streaming):
208         # this passes through, so the real consumer can flow-control the real
209         # producer. Therefore we don't need to provide any IPushProducer
210         # methods. We implement all the IConsumer methods as pass-throughs,
211         # and only intercept write() to perform decryption.
212         self._consumer.registerProducer(producer, streaming)
213     def unregisterProducer(self):
214         self._consumer.unregisterProducer()
215     def write(self, ciphertext):
216         started = now()
217         plaintext = self._decryptor.process(ciphertext)
218         if self._read_ev:
219             elapsed = now() - started
220             self._read_ev.update(0, elapsed, 0)
221         if self._download_status:
222             self._download_status.add_misc_event("AES", started, now())
223         self._consumer.write(plaintext)
224
225 class ImmutableFileNode:
226     implements(IImmutableFileNode)
227
228     # I wrap a CiphertextFileNode with a decryption key
229     def __init__(self, filecap, storage_broker, secret_holder, terminator,
230                  history):
231         assert isinstance(filecap, uri.CHKFileURI)
232         verifycap = filecap.get_verify_cap()
233         self._cnode = CiphertextFileNode(verifycap, storage_broker,
234                                          secret_holder, terminator, history)
235         assert isinstance(filecap, uri.CHKFileURI)
236         self.u = filecap
237         self._readkey = filecap.key
238
239     # TODO: I'm not sure about this.. what's the use case for node==node? If
240     # we keep it here, we should also put this on CiphertextFileNode
241     def __hash__(self):
242         return self.u.__hash__()
243     def __eq__(self, other):
244         if isinstance(other, ImmutableFileNode):
245             return self.u.__eq__(other.u)
246         else:
247             return False
248     def __ne__(self, other):
249         if isinstance(other, ImmutableFileNode):
250             return self.u.__eq__(other.u)
251         else:
252             return True
253
254     def read(self, consumer, offset=0, size=None):
255         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
256         d = self._cnode.read(decryptor, offset, size)
257         d.addCallback(lambda dc: consumer)
258         return d
259
260     def raise_error(self):
261         pass
262
263     def get_write_uri(self):
264         return None
265
266     def get_readonly_uri(self):
267         return self.get_uri()
268
269     def get_uri(self):
270         return self.u.to_string()
271     def get_cap(self):
272         return self.u
273     def get_readcap(self):
274         return self.u.get_readonly()
275     def get_verify_cap(self):
276         return self.u.get_verify_cap()
277     def get_repair_cap(self):
278         # CHK files can be repaired with just the verifycap
279         return self.u.get_verify_cap()
280
281     def get_storage_index(self):
282         return self.u.get_storage_index()
283
284     def get_size(self):
285         return self.u.get_size()
286     def get_current_size(self):
287         return defer.succeed(self.get_size())
288
289     def is_mutable(self):
290         return False
291
292     def is_readonly(self):
293         return True
294
295     def is_unknown(self):
296         return False
297
298     def is_allowed_in_immutable_directory(self):
299         return True
300
301     def check_and_repair(self, monitor, verify=False, add_lease=False):
302         return self._cnode.check_and_repair(monitor, verify, add_lease)
303     def check(self, monitor, verify=False, add_lease=False):
304         return self._cnode.check(monitor, verify, add_lease)
305
306     def get_best_readable_version(self):
307         """
308         Return an IReadable of the best version of this file. Since
309         immutable files can have only one version, we just return the
310         current filenode.
311         """
312         return defer.succeed(self)
313
314
315     def download_best_version(self):
316         """
317         Download the best version of this file, returning its contents
318         as a bytestring. Since there is only one version of an immutable
319         file, we download and return the contents of this file.
320         """
321         d = consumer.download_to_data(self)
322         return d
323
324     # for an immutable file, download_to_data (specified in IReadable)
325     # is the same as download_best_version (specified in IFileNode). For
326     # mutable files, the difference is more meaningful, since they can
327     # have multiple versions.
328     download_to_data = download_best_version
329
330
331     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
332     # get_size_of_best_version(IFileNode) are all the same for immutable
333     # files.
334     get_size_of_best_version = get_current_size