]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
switch UploadResults to use getters, hide internal data, for all but .uri
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import copy
4 import time
5 now = time.time
6 from zope.interface import implements
7 from twisted.internet import defer
8
9 from allmydata import uri
10 from twisted.internet.interfaces import IConsumer
11 from allmydata.interfaces import IImmutableFileNode, IUploadResults
12 from allmydata.util import consumer
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.util.dictutil import DictOfSets
15 from pycryptopp.cipher.aes import AES
16
17 # local imports
18 from allmydata.immutable.checker import Checker
19 from allmydata.immutable.repairer import Repairer
20 from allmydata.immutable.downloader.node import DownloadNode, \
21      IDownloadStatusHandlingConsumer
22 from allmydata.immutable.downloader.status import DownloadStatus
23
24 class CiphertextFileNode:
25     def __init__(self, verifycap, storage_broker, secret_holder,
26                  terminator, history):
27         assert isinstance(verifycap, uri.CHKFileVerifierURI)
28         self._verifycap = verifycap
29         self._storage_broker = storage_broker
30         self._secret_holder = secret_holder
31         self._terminator = terminator
32         self._history = history
33         self._download_status = None
34         self._node = None # created lazily, on read()
35
36     def _maybe_create_download_node(self):
37         if not self._download_status:
38             ds = DownloadStatus(self._verifycap.storage_index,
39                                 self._verifycap.size)
40             if self._history:
41                 self._history.add_download(ds)
42             self._download_status = ds
43         if self._node is None:
44             self._node = DownloadNode(self._verifycap, self._storage_broker,
45                                       self._secret_holder,
46                                       self._terminator,
47                                       self._history, self._download_status)
48
49     def read(self, consumer, offset=0, size=None):
50         """I am the main entry point, from which FileNode.read() can get
51         data. I feed the consumer with the desired range of ciphertext. I
52         return a Deferred that fires (with the consumer) when the read is
53         finished."""
54         self._maybe_create_download_node()
55         return self._node.read(consumer, offset, size)
56
57     def get_segment(self, segnum):
58         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
59         Deferred that fires with (offset,data) when the desired segment is
60         available, and c is an object on which c.cancel() can be called to
61         disavow interest in the segment (after which 'd' will never fire).
62
63         You probably need to know the segment size before calling this,
64         unless you want the first few bytes of the file. If you ask for a
65         segment number which turns out to be too large, the Deferred will
66         errback with BadSegmentNumberError.
67
68         The Deferred fires with the offset of the first byte of the data
69         segment, so that you can call get_segment() before knowing the
70         segment size, and still know which data you received.
71         """
72         self._maybe_create_download_node()
73         return self._node.get_segment(segnum)
74
75     def get_segment_size(self):
76         # return a Deferred that fires with the file's real segment size
77         self._maybe_create_download_node()
78         return self._node.get_segsize()
79
80     def get_storage_index(self):
81         return self._verifycap.storage_index
82     def get_verify_cap(self):
83         return self._verifycap
84     def get_size(self):
85         return self._verifycap.size
86
87     def raise_error(self):
88         pass
89
90
91     def check_and_repair(self, monitor, verify=False, add_lease=False):
92         verifycap = self._verifycap
93         storage_index = verifycap.storage_index
94         sb = self._storage_broker
95         servers = sb.get_connected_servers()
96         sh = self._secret_holder
97
98         c = Checker(verifycap=verifycap, servers=servers,
99                     verify=verify, add_lease=add_lease, secret_holder=sh,
100                     monitor=monitor)
101         d = c.start()
102         def _maybe_repair(cr):
103             crr = CheckAndRepairResults(storage_index)
104             crr.pre_repair_results = cr
105             if cr.is_healthy():
106                 crr.post_repair_results = cr
107                 return defer.succeed(crr)
108             else:
109                 crr.repair_attempted = True
110                 crr.repair_successful = False # until proven successful
111                 def _gather_repair_results(ur):
112                     assert IUploadResults.providedBy(ur), ur
113                     # clone the cr (check results) to form the basis of the
114                     # prr (post-repair results)
115                     prr = CheckResults(cr.uri, cr.storage_index)
116                     prr.data = copy.deepcopy(cr.data)
117
118                     sm = prr.data['sharemap']
119                     assert isinstance(sm, DictOfSets), sm
120                     sm.update(ur.get_sharemap())
121                     servers_responding = set(prr.data['servers-responding'])
122                     for shnum, serverids in ur.get_sharemap().items():
123                         servers_responding.update(serverids)
124                     servers_responding = sorted(servers_responding)
125                     prr.data['servers-responding'] = servers_responding
126                     prr.data['count-shares-good'] = len(sm)
127                     good_hosts = len(reduce(set.union, sm.itervalues(), set()))
128                     prr.data['count-good-share-hosts'] = good_hosts
129                     is_healthy = bool(len(sm) >= verifycap.total_shares)
130                     is_recoverable = bool(len(sm) >= verifycap.needed_shares)
131                     prr.set_healthy(is_healthy)
132                     prr.set_recoverable(is_recoverable)
133                     crr.repair_successful = is_healthy
134                     prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
135
136                     crr.post_repair_results = prr
137                     return crr
138                 def _repair_error(f):
139                     # as with mutable repair, I'm not sure if I want to pass
140                     # through a failure or not. TODO
141                     crr.repair_successful = False
142                     crr.repair_failure = f
143                     return f
144                 r = Repairer(self, storage_broker=sb, secret_holder=sh,
145                              monitor=monitor)
146                 d = r.start()
147                 d.addCallbacks(_gather_repair_results, _repair_error)
148                 return d
149
150         d.addCallback(_maybe_repair)
151         return d
152
153     def check(self, monitor, verify=False, add_lease=False):
154         verifycap = self._verifycap
155         sb = self._storage_broker
156         servers = sb.get_connected_servers()
157         sh = self._secret_holder
158
159         v = Checker(verifycap=verifycap, servers=servers,
160                     verify=verify, add_lease=add_lease, secret_holder=sh,
161                     monitor=monitor)
162         return v.start()
163
164 class DecryptingConsumer:
165     """I sit between a CiphertextDownloader (which acts as a Producer) and
166     the real Consumer, decrypting everything that passes by. The real
167     Consumer sees the real Producer, but the Producer sees us instead of the
168     real consumer."""
169     implements(IConsumer, IDownloadStatusHandlingConsumer)
170
171     def __init__(self, consumer, readkey, offset):
172         self._consumer = consumer
173         self._read_ev = None
174         self._download_status = None
175         # TODO: pycryptopp CTR-mode needs random-access operations: I want
176         # either a=AES(readkey, offset) or better yet both of:
177         #  a=AES(readkey, offset=0)
178         #  a.process(ciphertext, offset=xyz)
179         # For now, we fake it with the existing iv= argument.
180         offset_big = offset // 16
181         offset_small = offset % 16
182         iv = binascii.unhexlify("%032x" % offset_big)
183         self._decryptor = AES(readkey, iv=iv)
184         self._decryptor.process("\x00"*offset_small)
185
186     def set_download_status_read_event(self, read_ev):
187         self._read_ev = read_ev
188     def set_download_status(self, ds):
189         self._download_status = ds
190
191     def registerProducer(self, producer, streaming):
192         # this passes through, so the real consumer can flow-control the real
193         # producer. Therefore we don't need to provide any IPushProducer
194         # methods. We implement all the IConsumer methods as pass-throughs,
195         # and only intercept write() to perform decryption.
196         self._consumer.registerProducer(producer, streaming)
197     def unregisterProducer(self):
198         self._consumer.unregisterProducer()
199     def write(self, ciphertext):
200         started = now()
201         plaintext = self._decryptor.process(ciphertext)
202         if self._read_ev:
203             elapsed = now() - started
204             self._read_ev.update(0, elapsed, 0)
205         if self._download_status:
206             self._download_status.add_misc_event("AES", started, now())
207         self._consumer.write(plaintext)
208
209 class ImmutableFileNode:
210     implements(IImmutableFileNode)
211
212     # I wrap a CiphertextFileNode with a decryption key
213     def __init__(self, filecap, storage_broker, secret_holder, terminator,
214                  history):
215         assert isinstance(filecap, uri.CHKFileURI)
216         verifycap = filecap.get_verify_cap()
217         self._cnode = CiphertextFileNode(verifycap, storage_broker,
218                                          secret_holder, terminator, history)
219         assert isinstance(filecap, uri.CHKFileURI)
220         self.u = filecap
221         self._readkey = filecap.key
222
223     # TODO: I'm not sure about this.. what's the use case for node==node? If
224     # we keep it here, we should also put this on CiphertextFileNode
225     def __hash__(self):
226         return self.u.__hash__()
227     def __eq__(self, other):
228         if isinstance(other, ImmutableFileNode):
229             return self.u.__eq__(other.u)
230         else:
231             return False
232     def __ne__(self, other):
233         if isinstance(other, ImmutableFileNode):
234             return self.u.__eq__(other.u)
235         else:
236             return True
237
238     def read(self, consumer, offset=0, size=None):
239         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
240         d = self._cnode.read(decryptor, offset, size)
241         d.addCallback(lambda dc: consumer)
242         return d
243
244     def raise_error(self):
245         pass
246
247     def get_write_uri(self):
248         return None
249
250     def get_readonly_uri(self):
251         return self.get_uri()
252
253     def get_uri(self):
254         return self.u.to_string()
255     def get_cap(self):
256         return self.u
257     def get_readcap(self):
258         return self.u.get_readonly()
259     def get_verify_cap(self):
260         return self.u.get_verify_cap()
261     def get_repair_cap(self):
262         # CHK files can be repaired with just the verifycap
263         return self.u.get_verify_cap()
264
265     def get_storage_index(self):
266         return self.u.get_storage_index()
267
268     def get_size(self):
269         return self.u.get_size()
270     def get_current_size(self):
271         return defer.succeed(self.get_size())
272
273     def is_mutable(self):
274         return False
275
276     def is_readonly(self):
277         return True
278
279     def is_unknown(self):
280         return False
281
282     def is_allowed_in_immutable_directory(self):
283         return True
284
285     def check_and_repair(self, monitor, verify=False, add_lease=False):
286         return self._cnode.check_and_repair(monitor, verify, add_lease)
287     def check(self, monitor, verify=False, add_lease=False):
288         return self._cnode.check(monitor, verify, add_lease)
289
290     def get_best_readable_version(self):
291         """
292         Return an IReadable of the best version of this file. Since
293         immutable files can have only one version, we just return the
294         current filenode.
295         """
296         return defer.succeed(self)
297
298
299     def download_best_version(self):
300         """
301         Download the best version of this file, returning its contents
302         as a bytestring. Since there is only one version of an immutable
303         file, we download and return the contents of this file.
304         """
305         d = consumer.download_to_data(self)
306         return d
307
308     # for an immutable file, download_to_data (specified in IReadable)
309     # is the same as download_best_version (specified in IFileNode). For
310     # mutable files, the difference is more meaningful, since they can
311     # have multiple versions.
312     download_to_data = download_best_version
313
314
315     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
316     # get_size_of_best_version(IFileNode) are all the same for immutable
317     # files.
318     get_size_of_best_version = get_current_size