]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
0bcacb001440960e8042fef8b9e62679f1cfda4e
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import copy
4 import time
5 now = time.time
6 from zope.interface import implements
7 from twisted.internet import defer
8
9 from allmydata import uri
10 from twisted.internet.interfaces import IConsumer
11 from allmydata.interfaces import IImmutableFileNode, IUploadResults
12 from allmydata.util import consumer
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.util.dictutil import DictOfSets
15 from pycryptopp.cipher.aes import AES
16
17 # local imports
18 from allmydata.immutable.checker import Checker
19 from allmydata.immutable.repairer import Repairer
20 from allmydata.immutable.downloader.node import DownloadNode, \
21      IDownloadStatusHandlingConsumer
22 from allmydata.immutable.downloader.status import DownloadStatus
23
24 class CiphertextFileNode:
25     def __init__(self, verifycap, storage_broker, secret_holder,
26                  terminator, history):
27         assert isinstance(verifycap, uri.CHKFileVerifierURI)
28         self._verifycap = verifycap
29         self._storage_broker = storage_broker
30         self._secret_holder = secret_holder
31         self._terminator = terminator
32         self._history = history
33         self._download_status = None
34         self._node = None # created lazily, on read()
35
36     def _maybe_create_download_node(self):
37         if not self._download_status:
38             ds = DownloadStatus(self._verifycap.storage_index,
39                                 self._verifycap.size)
40             if self._history:
41                 self._history.add_download(ds)
42             self._download_status = ds
43         if self._node is None:
44             self._node = DownloadNode(self._verifycap, self._storage_broker,
45                                       self._secret_holder,
46                                       self._terminator,
47                                       self._history, self._download_status)
48
49     def read(self, consumer, offset=0, size=None):
50         """I am the main entry point, from which FileNode.read() can get
51         data. I feed the consumer with the desired range of ciphertext. I
52         return a Deferred that fires (with the consumer) when the read is
53         finished."""
54         self._maybe_create_download_node()
55         return self._node.read(consumer, offset, size)
56
57     def get_segment(self, segnum):
58         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
59         Deferred that fires with (offset,data) when the desired segment is
60         available, and c is an object on which c.cancel() can be called to
61         disavow interest in the segment (after which 'd' will never fire).
62
63         You probably need to know the segment size before calling this,
64         unless you want the first few bytes of the file. If you ask for a
65         segment number which turns out to be too large, the Deferred will
66         errback with BadSegmentNumberError.
67
68         The Deferred fires with the offset of the first byte of the data
69         segment, so that you can call get_segment() before knowing the
70         segment size, and still know which data you received.
71         """
72         self._maybe_create_download_node()
73         return self._node.get_segment(segnum)
74
75     def get_segment_size(self):
76         # return a Deferred that fires with the file's real segment size
77         self._maybe_create_download_node()
78         return self._node.get_segsize()
79
80     def get_storage_index(self):
81         return self._verifycap.storage_index
82     def get_verify_cap(self):
83         return self._verifycap
84     def get_size(self):
85         return self._verifycap.size
86
87     def raise_error(self):
88         pass
89
90
91     def check_and_repair(self, monitor, verify=False, add_lease=False):
92         verifycap = self._verifycap
93         storage_index = verifycap.storage_index
94         sb = self._storage_broker
95         servers = sb.get_connected_servers()
96         sh = self._secret_holder
97
98         c = Checker(verifycap=verifycap, servers=servers,
99                     verify=verify, add_lease=add_lease, secret_holder=sh,
100                     monitor=monitor)
101         d = c.start()
102         def _maybe_repair(cr):
103             crr = CheckAndRepairResults(storage_index)
104             crr.pre_repair_results = cr
105             if cr.is_healthy():
106                 crr.post_repair_results = cr
107                 return defer.succeed(crr)
108             else:
109                 crr.repair_attempted = True
110                 crr.repair_successful = False # until proven successful
111                 def _gather_repair_results(ur):
112                     assert IUploadResults.providedBy(ur), ur
113                     # clone the cr (check results) to form the basis of the
114                     # prr (post-repair results)
115                     prr = CheckResults(cr.uri, cr.storage_index)
116                     prr.data = copy.deepcopy(cr.data)
117
118                     sm = prr.data['sharemap']
119                     assert isinstance(sm, DictOfSets), sm
120                     sm.update(ur.sharemap)
121                     servers_responding = set(prr.data['servers-responding'])
122                     servers_responding.union(ur.sharemap.iterkeys())
123                     prr.data['servers-responding'] = list(servers_responding)
124                     prr.data['count-shares-good'] = len(sm)
125                     prr.data['count-good-share-hosts'] = len(reduce(set.union, 
126                                                        sm.itervalues(), set()))
127                     is_healthy = bool(len(sm) >= verifycap.total_shares)
128                     is_recoverable = bool(len(sm) >= verifycap.needed_shares)
129                     prr.set_healthy(is_healthy)
130                     prr.set_recoverable(is_recoverable)
131                     crr.repair_successful = is_healthy
132                     prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
133
134                     crr.post_repair_results = prr
135                     return crr
136                 def _repair_error(f):
137                     # as with mutable repair, I'm not sure if I want to pass
138                     # through a failure or not. TODO
139                     crr.repair_successful = False
140                     crr.repair_failure = f
141                     return f
142                 r = Repairer(self, storage_broker=sb, secret_holder=sh,
143                              monitor=monitor)
144                 d = r.start()
145                 d.addCallbacks(_gather_repair_results, _repair_error)
146                 return d
147
148         d.addCallback(_maybe_repair)
149         return d
150
151     def check(self, monitor, verify=False, add_lease=False):
152         verifycap = self._verifycap
153         sb = self._storage_broker
154         servers = sb.get_connected_servers()
155         sh = self._secret_holder
156
157         v = Checker(verifycap=verifycap, servers=servers,
158                     verify=verify, add_lease=add_lease, secret_holder=sh,
159                     monitor=monitor)
160         return v.start()
161
162 class DecryptingConsumer:
163     """I sit between a CiphertextDownloader (which acts as a Producer) and
164     the real Consumer, decrypting everything that passes by. The real
165     Consumer sees the real Producer, but the Producer sees us instead of the
166     real consumer."""
167     implements(IConsumer, IDownloadStatusHandlingConsumer)
168
169     def __init__(self, consumer, readkey, offset):
170         self._consumer = consumer
171         self._read_ev = None
172         self._download_status = None
173         # TODO: pycryptopp CTR-mode needs random-access operations: I want
174         # either a=AES(readkey, offset) or better yet both of:
175         #  a=AES(readkey, offset=0)
176         #  a.process(ciphertext, offset=xyz)
177         # For now, we fake it with the existing iv= argument.
178         offset_big = offset // 16
179         offset_small = offset % 16
180         iv = binascii.unhexlify("%032x" % offset_big)
181         self._decryptor = AES(readkey, iv=iv)
182         self._decryptor.process("\x00"*offset_small)
183
184     def set_download_status_read_event(self, read_ev):
185         self._read_ev = read_ev
186     def set_download_status(self, ds):
187         self._download_status = ds
188
189     def registerProducer(self, producer, streaming):
190         # this passes through, so the real consumer can flow-control the real
191         # producer. Therefore we don't need to provide any IPushProducer
192         # methods. We implement all the IConsumer methods as pass-throughs,
193         # and only intercept write() to perform decryption.
194         self._consumer.registerProducer(producer, streaming)
195     def unregisterProducer(self):
196         self._consumer.unregisterProducer()
197     def write(self, ciphertext):
198         started = now()
199         plaintext = self._decryptor.process(ciphertext)
200         if self._read_ev:
201             elapsed = now() - started
202             self._read_ev.update(0, elapsed, 0)
203         if self._download_status:
204             self._download_status.add_misc_event("AES", started, now())
205         self._consumer.write(plaintext)
206
207 class ImmutableFileNode:
208     implements(IImmutableFileNode)
209
210     # I wrap a CiphertextFileNode with a decryption key
211     def __init__(self, filecap, storage_broker, secret_holder, terminator,
212                  history):
213         assert isinstance(filecap, uri.CHKFileURI)
214         verifycap = filecap.get_verify_cap()
215         self._cnode = CiphertextFileNode(verifycap, storage_broker,
216                                          secret_holder, terminator, history)
217         assert isinstance(filecap, uri.CHKFileURI)
218         self.u = filecap
219         self._readkey = filecap.key
220
221     # TODO: I'm not sure about this.. what's the use case for node==node? If
222     # we keep it here, we should also put this on CiphertextFileNode
223     def __hash__(self):
224         return self.u.__hash__()
225     def __eq__(self, other):
226         if isinstance(other, ImmutableFileNode):
227             return self.u.__eq__(other.u)
228         else:
229             return False
230     def __ne__(self, other):
231         if isinstance(other, ImmutableFileNode):
232             return self.u.__eq__(other.u)
233         else:
234             return True
235
236     def read(self, consumer, offset=0, size=None):
237         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
238         d = self._cnode.read(decryptor, offset, size)
239         d.addCallback(lambda dc: consumer)
240         return d
241
242     def raise_error(self):
243         pass
244
245     def get_write_uri(self):
246         return None
247
248     def get_readonly_uri(self):
249         return self.get_uri()
250
251     def get_uri(self):
252         return self.u.to_string()
253     def get_cap(self):
254         return self.u
255     def get_readcap(self):
256         return self.u.get_readonly()
257     def get_verify_cap(self):
258         return self.u.get_verify_cap()
259     def get_repair_cap(self):
260         # CHK files can be repaired with just the verifycap
261         return self.u.get_verify_cap()
262
263     def get_storage_index(self):
264         return self.u.get_storage_index()
265
266     def get_size(self):
267         return self.u.get_size()
268     def get_current_size(self):
269         return defer.succeed(self.get_size())
270
271     def is_mutable(self):
272         return False
273
274     def is_readonly(self):
275         return True
276
277     def is_unknown(self):
278         return False
279
280     def is_allowed_in_immutable_directory(self):
281         return True
282
283     def check_and_repair(self, monitor, verify=False, add_lease=False):
284         return self._cnode.check_and_repair(monitor, verify, add_lease)
285     def check(self, monitor, verify=False, add_lease=False):
286         return self._cnode.check(monitor, verify, add_lease)
287
288     def get_best_readable_version(self):
289         """
290         Return an IReadable of the best version of this file. Since
291         immutable files can have only one version, we just return the
292         current filenode.
293         """
294         return defer.succeed(self)
295
296
297     def download_best_version(self):
298         """
299         Download the best version of this file, returning its contents
300         as a bytestring. Since there is only one version of an immutable
301         file, we download and return the contents of this file.
302         """
303         d = consumer.download_to_data(self)
304         return d
305
306     # for an immutable file, download_to_data (specified in IReadable)
307     # is the same as download_best_version (specified in IFileNode). For
308     # mutable files, the difference is more meaningful, since they can
309     # have multiple versions.
310     download_to_data = download_best_version
311
312
313     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
314     # get_size_of_best_version(IFileNode) are all the same for immutable
315     # files.
316     get_size_of_best_version = get_current_size