]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
d8735189633802e765b5fbbc81684be21d95954c
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import copy
4 import time
5 now = time.time
6 from zope.interface import implements
7 from twisted.internet import defer
8
9 from allmydata import uri
10 from twisted.internet.interfaces import IConsumer
11 from allmydata.interfaces import IImmutableFileNode, IUploadResults
12 from allmydata.util import consumer
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.util.dictutil import DictOfSets
15 from pycryptopp.cipher.aes import AES
16
17 # local imports
18 from allmydata.immutable.checker import Checker
19 from allmydata.immutable.repairer import Repairer
20 from allmydata.immutable.downloader.node import DownloadNode, \
21      IDownloadStatusHandlingConsumer
22 from allmydata.immutable.downloader.status import DownloadStatus
23
24 class CiphertextFileNode:
25     def __init__(self, verifycap, storage_broker, secret_holder,
26                  terminator, history):
27         assert isinstance(verifycap, uri.CHKFileVerifierURI)
28         self._verifycap = verifycap
29         self._storage_broker = storage_broker
30         self._secret_holder = secret_holder
31         self._terminator = terminator
32         self._history = history
33         self._download_status = None
34         self._node = None # created lazily, on read()
35
36     def _maybe_create_download_node(self):
37         if not self._download_status:
38             ds = DownloadStatus(self._verifycap.storage_index,
39                                 self._verifycap.size)
40             if self._history:
41                 self._history.add_download(ds)
42             self._download_status = ds
43         if self._node is None:
44             self._node = DownloadNode(self._verifycap, self._storage_broker,
45                                       self._secret_holder,
46                                       self._terminator,
47                                       self._history, self._download_status)
48
49     def read(self, consumer, offset=0, size=None):
50         """I am the main entry point, from which FileNode.read() can get
51         data. I feed the consumer with the desired range of ciphertext. I
52         return a Deferred that fires (with the consumer) when the read is
53         finished."""
54         self._maybe_create_download_node()
55         return self._node.read(consumer, offset, size)
56
57     def get_segment(self, segnum):
58         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
59         Deferred that fires with (offset,data) when the desired segment is
60         available, and c is an object on which c.cancel() can be called to
61         disavow interest in the segment (after which 'd' will never fire).
62
63         You probably need to know the segment size before calling this,
64         unless you want the first few bytes of the file. If you ask for a
65         segment number which turns out to be too large, the Deferred will
66         errback with BadSegmentNumberError.
67
68         The Deferred fires with the offset of the first byte of the data
69         segment, so that you can call get_segment() before knowing the
70         segment size, and still know which data you received.
71         """
72         self._maybe_create_download_node()
73         return self._node.get_segment(segnum)
74
75     def get_segment_size(self):
76         # return a Deferred that fires with the file's real segment size
77         self._maybe_create_download_node()
78         return self._node.get_segsize()
79
80     def get_storage_index(self):
81         return self._verifycap.storage_index
82     def get_verify_cap(self):
83         return self._verifycap
84     def get_size(self):
85         return self._verifycap.size
86
87     def raise_error(self):
88         pass
89
90
91     def check_and_repair(self, monitor, verify=False, add_lease=False):
92         c = Checker(verifycap=self._verifycap,
93                     servers=self._storage_broker.get_connected_servers(),
94                     verify=verify, add_lease=add_lease,
95                     secret_holder=self._secret_holder,
96                     monitor=monitor)
97         d = c.start()
98         d.addCallback(self._maybe_repair, monitor)
99         return d
100
101     def _maybe_repair(self, cr, monitor):
102         crr = CheckAndRepairResults(self._verifycap.storage_index)
103         crr.pre_repair_results = cr
104         if cr.is_healthy():
105             crr.post_repair_results = cr
106             return defer.succeed(crr)
107
108         crr.repair_attempted = True
109         crr.repair_successful = False # until proven successful
110         def _repair_error(f):
111             # as with mutable repair, I'm not sure if I want to pass
112             # through a failure or not. TODO
113             crr.repair_successful = False
114             crr.repair_failure = f
115             return f
116         r = Repairer(self, storage_broker=self._storage_broker,
117                      secret_holder=self._secret_holder,
118                      monitor=monitor)
119         d = r.start()
120         d.addCallbacks(self._gather_repair_results, _repair_error,
121                        callbackArgs=(cr, crr,))
122         return d
123
124     def _gather_repair_results(self, ur, cr, crr):
125         assert IUploadResults.providedBy(ur), ur
126         # clone the cr (check results) to form the basis of the
127         # prr (post-repair results)
128         prr = CheckResults(cr.uri, cr.storage_index)
129         prr_data = copy.deepcopy(cr.get_data())
130
131         verifycap = self._verifycap
132         servers_responding = set(prr_data['servers-responding'])
133         sm = prr_data['sharemap']
134         assert isinstance(sm, DictOfSets), sm
135         for shnum, servers in ur.get_sharemap().items():
136             for s in servers:
137                 sm.add(shnum, s.get_serverid())
138                 servers_responding.add(s.get_serverid())
139         servers_responding = sorted(servers_responding)
140
141         good_hosts = len(reduce(set.union, sm.itervalues(), set()))
142         is_healthy = bool(len(sm) >= verifycap.total_shares)
143         is_recoverable = bool(len(sm) >= verifycap.needed_shares)
144         prr.set_data(
145             count_shares_needed=verifycap.needed_shares,
146             count_shares_expected=verifycap.total_shares,
147             count_shares_good=len(sm),
148             count_good_share_hosts=good_hosts,
149             count_recoverable_versions=int(is_recoverable),
150             count_unrecoverable_versions=int(not is_recoverable),
151             servers_responding=list(servers_responding),
152             sharemap=sm,
153             count_wrong_shares=0, # no such thing as wrong, for immutable
154             list_corrupt_shares=prr_data["list-corrupt-shares"],
155             count_corrupt_shares=prr_data["count-corrupt-shares"],
156             list_incompatible_shares=prr_data["list-incompatible-shares"],
157             count_incompatible_shares=prr_data["count-incompatible-shares"],
158             )
159         prr.set_healthy(is_healthy)
160         prr.set_recoverable(is_recoverable)
161         crr.repair_successful = is_healthy
162         prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
163
164         crr.post_repair_results = prr
165         return crr
166
167     def check(self, monitor, verify=False, add_lease=False):
168         verifycap = self._verifycap
169         sb = self._storage_broker
170         servers = sb.get_connected_servers()
171         sh = self._secret_holder
172
173         v = Checker(verifycap=verifycap, servers=servers,
174                     verify=verify, add_lease=add_lease, secret_holder=sh,
175                     monitor=monitor)
176         return v.start()
177
178 class DecryptingConsumer:
179     """I sit between a CiphertextDownloader (which acts as a Producer) and
180     the real Consumer, decrypting everything that passes by. The real
181     Consumer sees the real Producer, but the Producer sees us instead of the
182     real consumer."""
183     implements(IConsumer, IDownloadStatusHandlingConsumer)
184
185     def __init__(self, consumer, readkey, offset):
186         self._consumer = consumer
187         self._read_ev = None
188         self._download_status = None
189         # TODO: pycryptopp CTR-mode needs random-access operations: I want
190         # either a=AES(readkey, offset) or better yet both of:
191         #  a=AES(readkey, offset=0)
192         #  a.process(ciphertext, offset=xyz)
193         # For now, we fake it with the existing iv= argument.
194         offset_big = offset // 16
195         offset_small = offset % 16
196         iv = binascii.unhexlify("%032x" % offset_big)
197         self._decryptor = AES(readkey, iv=iv)
198         self._decryptor.process("\x00"*offset_small)
199
200     def set_download_status_read_event(self, read_ev):
201         self._read_ev = read_ev
202     def set_download_status(self, ds):
203         self._download_status = ds
204
205     def registerProducer(self, producer, streaming):
206         # this passes through, so the real consumer can flow-control the real
207         # producer. Therefore we don't need to provide any IPushProducer
208         # methods. We implement all the IConsumer methods as pass-throughs,
209         # and only intercept write() to perform decryption.
210         self._consumer.registerProducer(producer, streaming)
211     def unregisterProducer(self):
212         self._consumer.unregisterProducer()
213     def write(self, ciphertext):
214         started = now()
215         plaintext = self._decryptor.process(ciphertext)
216         if self._read_ev:
217             elapsed = now() - started
218             self._read_ev.update(0, elapsed, 0)
219         if self._download_status:
220             self._download_status.add_misc_event("AES", started, now())
221         self._consumer.write(plaintext)
222
223 class ImmutableFileNode:
224     implements(IImmutableFileNode)
225
226     # I wrap a CiphertextFileNode with a decryption key
227     def __init__(self, filecap, storage_broker, secret_holder, terminator,
228                  history):
229         assert isinstance(filecap, uri.CHKFileURI)
230         verifycap = filecap.get_verify_cap()
231         self._cnode = CiphertextFileNode(verifycap, storage_broker,
232                                          secret_holder, terminator, history)
233         assert isinstance(filecap, uri.CHKFileURI)
234         self.u = filecap
235         self._readkey = filecap.key
236
237     # TODO: I'm not sure about this.. what's the use case for node==node? If
238     # we keep it here, we should also put this on CiphertextFileNode
239     def __hash__(self):
240         return self.u.__hash__()
241     def __eq__(self, other):
242         if isinstance(other, ImmutableFileNode):
243             return self.u.__eq__(other.u)
244         else:
245             return False
246     def __ne__(self, other):
247         if isinstance(other, ImmutableFileNode):
248             return self.u.__eq__(other.u)
249         else:
250             return True
251
252     def read(self, consumer, offset=0, size=None):
253         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
254         d = self._cnode.read(decryptor, offset, size)
255         d.addCallback(lambda dc: consumer)
256         return d
257
258     def raise_error(self):
259         pass
260
261     def get_write_uri(self):
262         return None
263
264     def get_readonly_uri(self):
265         return self.get_uri()
266
267     def get_uri(self):
268         return self.u.to_string()
269     def get_cap(self):
270         return self.u
271     def get_readcap(self):
272         return self.u.get_readonly()
273     def get_verify_cap(self):
274         return self.u.get_verify_cap()
275     def get_repair_cap(self):
276         # CHK files can be repaired with just the verifycap
277         return self.u.get_verify_cap()
278
279     def get_storage_index(self):
280         return self.u.get_storage_index()
281
282     def get_size(self):
283         return self.u.get_size()
284     def get_current_size(self):
285         return defer.succeed(self.get_size())
286
287     def is_mutable(self):
288         return False
289
290     def is_readonly(self):
291         return True
292
293     def is_unknown(self):
294         return False
295
296     def is_allowed_in_immutable_directory(self):
297         return True
298
299     def check_and_repair(self, monitor, verify=False, add_lease=False):
300         return self._cnode.check_and_repair(monitor, verify, add_lease)
301     def check(self, monitor, verify=False, add_lease=False):
302         return self._cnode.check(monitor, verify, add_lease)
303
304     def get_best_readable_version(self):
305         """
306         Return an IReadable of the best version of this file. Since
307         immutable files can have only one version, we just return the
308         current filenode.
309         """
310         return defer.succeed(self)
311
312
313     def download_best_version(self):
314         """
315         Download the best version of this file, returning its contents
316         as a bytestring. Since there is only one version of an immutable
317         file, we download and return the contents of this file.
318         """
319         d = consumer.download_to_data(self)
320         return d
321
322     # for an immutable file, download_to_data (specified in IReadable)
323     # is the same as download_best_version (specified in IFileNode). For
324     # mutable files, the difference is more meaningful, since they can
325     # have multiple versions.
326     download_to_data = download_best_version
327
328
329     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
330     # get_size_of_best_version(IFileNode) are all the same for immutable
331     # files.
332     get_size_of_best_version = get_current_size