]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
Rewrite download-status-timeline visualizer ('viz') with d3.js
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import copy
4 import time
5 now = time.time
6 from zope.interface import implements
7 from twisted.internet import defer
8
9 from allmydata import uri
10 from twisted.internet.interfaces import IConsumer
11 from allmydata.interfaces import IImmutableFileNode, IUploadResults
12 from allmydata.util import consumer
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.util.dictutil import DictOfSets
15 from pycryptopp.cipher.aes import AES
16
17 # local imports
18 from allmydata.immutable.checker import Checker
19 from allmydata.immutable.repairer import Repairer
20 from allmydata.immutable.downloader.node import DownloadNode, \
21      IDownloadStatusHandlingConsumer
22 from allmydata.immutable.downloader.status import DownloadStatus
23
24 class CiphertextFileNode:
25     def __init__(self, verifycap, storage_broker, secret_holder,
26                  terminator, history):
27         assert isinstance(verifycap, uri.CHKFileVerifierURI)
28         self._verifycap = verifycap
29         self._storage_broker = storage_broker
30         self._secret_holder = secret_holder
31         self._terminator = terminator
32         self._history = history
33         self._download_status = None
34         self._node = None # created lazily, on read()
35
36     def _maybe_create_download_node(self):
37         if not self._download_status:
38             ds = DownloadStatus(self._verifycap.storage_index,
39                                 self._verifycap.size)
40             if self._history:
41                 self._history.add_download(ds)
42             self._download_status = ds
43         if self._node is None:
44             self._node = DownloadNode(self._verifycap, self._storage_broker,
45                                       self._secret_holder,
46                                       self._terminator,
47                                       self._history, self._download_status)
48
49     def read(self, consumer, offset=0, size=None):
50         """I am the main entry point, from which FileNode.read() can get
51         data. I feed the consumer with the desired range of ciphertext. I
52         return a Deferred that fires (with the consumer) when the read is
53         finished."""
54         self._maybe_create_download_node()
55         return self._node.read(consumer, offset, size)
56
57     def get_segment(self, segnum):
58         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
59         Deferred that fires with (offset,data) when the desired segment is
60         available, and c is an object on which c.cancel() can be called to
61         disavow interest in the segment (after which 'd' will never fire).
62
63         You probably need to know the segment size before calling this,
64         unless you want the first few bytes of the file. If you ask for a
65         segment number which turns out to be too large, the Deferred will
66         errback with BadSegmentNumberError.
67
68         The Deferred fires with the offset of the first byte of the data
69         segment, so that you can call get_segment() before knowing the
70         segment size, and still know which data you received.
71         """
72         self._maybe_create_download_node()
73         return self._node.get_segment(segnum)
74
75     def get_segment_size(self):
76         # return a Deferred that fires with the file's real segment size
77         self._maybe_create_download_node()
78         return self._node.get_segsize()
79
80     def get_storage_index(self):
81         return self._verifycap.storage_index
82     def get_verify_cap(self):
83         return self._verifycap
84     def get_size(self):
85         return self._verifycap.size
86
87     def raise_error(self):
88         pass
89
90
91     def check_and_repair(self, monitor, verify=False, add_lease=False):
92         verifycap = self._verifycap
93         storage_index = verifycap.storage_index
94         sb = self._storage_broker
95         servers = sb.get_connected_servers()
96         sh = self._secret_holder
97
98         c = Checker(verifycap=verifycap, servers=servers,
99                     verify=verify, add_lease=add_lease, secret_holder=sh,
100                     monitor=monitor)
101         d = c.start()
102         def _maybe_repair(cr):
103             crr = CheckAndRepairResults(storage_index)
104             crr.pre_repair_results = cr
105             if cr.is_healthy():
106                 crr.post_repair_results = cr
107                 return defer.succeed(crr)
108             else:
109                 crr.repair_attempted = True
110                 crr.repair_successful = False # until proven successful
111                 def _gather_repair_results(ur):
112                     assert IUploadResults.providedBy(ur), ur
113                     # clone the cr (check results) to form the basis of the
114                     # prr (post-repair results)
115                     prr = CheckResults(cr.uri, cr.storage_index)
116                     prr.data = copy.deepcopy(cr.data)
117
118                     sm = prr.data['sharemap']
119                     assert isinstance(sm, DictOfSets), sm
120                     sm.update(ur.sharemap)
121                     servers_responding = set(prr.data['servers-responding'])
122                     servers_responding.union(ur.sharemap.iterkeys())
123                     prr.data['servers-responding'] = list(servers_responding)
124                     prr.data['count-shares-good'] = len(sm)
125                     prr.data['count-good-share-hosts'] = len(sm)
126                     is_healthy = bool(len(sm) >= verifycap.total_shares)
127                     is_recoverable = bool(len(sm) >= verifycap.needed_shares)
128                     prr.set_healthy(is_healthy)
129                     prr.set_recoverable(is_recoverable)
130                     crr.repair_successful = is_healthy
131                     prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
132
133                     crr.post_repair_results = prr
134                     return crr
135                 def _repair_error(f):
136                     # as with mutable repair, I'm not sure if I want to pass
137                     # through a failure or not. TODO
138                     crr.repair_successful = False
139                     crr.repair_failure = f
140                     return f
141                 r = Repairer(self, storage_broker=sb, secret_holder=sh,
142                              monitor=monitor)
143                 d = r.start()
144                 d.addCallbacks(_gather_repair_results, _repair_error)
145                 return d
146
147         d.addCallback(_maybe_repair)
148         return d
149
150     def check(self, monitor, verify=False, add_lease=False):
151         verifycap = self._verifycap
152         sb = self._storage_broker
153         servers = sb.get_connected_servers()
154         sh = self._secret_holder
155
156         v = Checker(verifycap=verifycap, servers=servers,
157                     verify=verify, add_lease=add_lease, secret_holder=sh,
158                     monitor=monitor)
159         return v.start()
160
161 class DecryptingConsumer:
162     """I sit between a CiphertextDownloader (which acts as a Producer) and
163     the real Consumer, decrypting everything that passes by. The real
164     Consumer sees the real Producer, but the Producer sees us instead of the
165     real consumer."""
166     implements(IConsumer, IDownloadStatusHandlingConsumer)
167
168     def __init__(self, consumer, readkey, offset):
169         self._consumer = consumer
170         self._read_ev = None
171         self._download_status = None
172         # TODO: pycryptopp CTR-mode needs random-access operations: I want
173         # either a=AES(readkey, offset) or better yet both of:
174         #  a=AES(readkey, offset=0)
175         #  a.process(ciphertext, offset=xyz)
176         # For now, we fake it with the existing iv= argument.
177         offset_big = offset // 16
178         offset_small = offset % 16
179         iv = binascii.unhexlify("%032x" % offset_big)
180         self._decryptor = AES(readkey, iv=iv)
181         self._decryptor.process("\x00"*offset_small)
182
183     def set_download_status_read_event(self, read_ev):
184         self._read_ev = read_ev
185     def set_download_status(self, ds):
186         self._download_status = ds
187
188     def registerProducer(self, producer, streaming):
189         # this passes through, so the real consumer can flow-control the real
190         # producer. Therefore we don't need to provide any IPushProducer
191         # methods. We implement all the IConsumer methods as pass-throughs,
192         # and only intercept write() to perform decryption.
193         self._consumer.registerProducer(producer, streaming)
194     def unregisterProducer(self):
195         self._consumer.unregisterProducer()
196     def write(self, ciphertext):
197         started = now()
198         plaintext = self._decryptor.process(ciphertext)
199         if self._read_ev:
200             elapsed = now() - started
201             self._read_ev.update(0, elapsed, 0)
202         if self._download_status:
203             self._download_status.add_misc_event("AES", started, now())
204         self._consumer.write(plaintext)
205
206 class ImmutableFileNode:
207     implements(IImmutableFileNode)
208
209     # I wrap a CiphertextFileNode with a decryption key
210     def __init__(self, filecap, storage_broker, secret_holder, terminator,
211                  history):
212         assert isinstance(filecap, uri.CHKFileURI)
213         verifycap = filecap.get_verify_cap()
214         self._cnode = CiphertextFileNode(verifycap, storage_broker,
215                                          secret_holder, terminator, history)
216         assert isinstance(filecap, uri.CHKFileURI)
217         self.u = filecap
218         self._readkey = filecap.key
219
220     # TODO: I'm not sure about this.. what's the use case for node==node? If
221     # we keep it here, we should also put this on CiphertextFileNode
222     def __hash__(self):
223         return self.u.__hash__()
224     def __eq__(self, other):
225         if isinstance(other, ImmutableFileNode):
226             return self.u.__eq__(other.u)
227         else:
228             return False
229     def __ne__(self, other):
230         if isinstance(other, ImmutableFileNode):
231             return self.u.__eq__(other.u)
232         else:
233             return True
234
235     def read(self, consumer, offset=0, size=None):
236         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
237         d = self._cnode.read(decryptor, offset, size)
238         d.addCallback(lambda dc: consumer)
239         return d
240
241     def raise_error(self):
242         pass
243
244     def get_write_uri(self):
245         return None
246
247     def get_readonly_uri(self):
248         return self.get_uri()
249
250     def get_uri(self):
251         return self.u.to_string()
252     def get_cap(self):
253         return self.u
254     def get_readcap(self):
255         return self.u.get_readonly()
256     def get_verify_cap(self):
257         return self.u.get_verify_cap()
258     def get_repair_cap(self):
259         # CHK files can be repaired with just the verifycap
260         return self.u.get_verify_cap()
261
262     def get_storage_index(self):
263         return self.u.get_storage_index()
264
265     def get_size(self):
266         return self.u.get_size()
267     def get_current_size(self):
268         return defer.succeed(self.get_size())
269
270     def is_mutable(self):
271         return False
272
273     def is_readonly(self):
274         return True
275
276     def is_unknown(self):
277         return False
278
279     def is_allowed_in_immutable_directory(self):
280         return True
281
282     def check_and_repair(self, monitor, verify=False, add_lease=False):
283         return self._cnode.check_and_repair(monitor, verify, add_lease)
284     def check(self, monitor, verify=False, add_lease=False):
285         return self._cnode.check(monitor, verify, add_lease)
286
287     def get_best_readable_version(self):
288         """
289         Return an IReadable of the best version of this file. Since
290         immutable files can have only one version, we just return the
291         current filenode.
292         """
293         return defer.succeed(self)
294
295
296     def download_best_version(self):
297         """
298         Download the best version of this file, returning its contents
299         as a bytestring. Since there is only one version of an immutable
300         file, we download and return the contents of this file.
301         """
302         d = consumer.download_to_data(self)
303         return d
304
305     # for an immutable file, download_to_data (specified in IReadable)
306     # is the same as download_best_version (specified in IFileNode). For
307     # mutable files, the difference is more meaningful, since they can
308     # have multiple versions.
309     download_to_data = download_best_version
310
311
312     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
313     # get_size_of_best_version(IFileNode) are all the same for immutable
314     # files.
315     get_size_of_best_version = get_current_size