]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
1c3780ba6f39bb8a4e9e4a063fe62f690c57ca34
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import time
4 now = time.time
5 from zope.interface import implements
6 from twisted.internet import defer
7
8 from allmydata import uri
9 from twisted.internet.interfaces import IConsumer
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata.util import consumer
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from allmydata.util.happinessutil import servers_of_happiness
15 from pycryptopp.cipher.aes import AES
16
17 # local imports
18 from allmydata.immutable.checker import Checker
19 from allmydata.immutable.repairer import Repairer
20 from allmydata.immutable.downloader.node import DownloadNode, \
21      IDownloadStatusHandlingConsumer
22 from allmydata.immutable.downloader.status import DownloadStatus
23
24 class CiphertextFileNode:
25     def __init__(self, verifycap, storage_broker, secret_holder,
26                  terminator, history):
27         assert isinstance(verifycap, uri.CHKFileVerifierURI)
28         self._verifycap = verifycap
29         self._storage_broker = storage_broker
30         self._secret_holder = secret_holder
31         self._terminator = terminator
32         self._history = history
33         self._download_status = None
34         self._node = None # created lazily, on read()
35
36     def _maybe_create_download_node(self):
37         if not self._download_status:
38             ds = DownloadStatus(self._verifycap.storage_index,
39                                 self._verifycap.size)
40             if self._history:
41                 self._history.add_download(ds)
42             self._download_status = ds
43         if self._node is None:
44             self._node = DownloadNode(self._verifycap, self._storage_broker,
45                                       self._secret_holder,
46                                       self._terminator,
47                                       self._history, self._download_status)
48
49     def read(self, consumer, offset=0, size=None):
50         """I am the main entry point, from which FileNode.read() can get
51         data. I feed the consumer with the desired range of ciphertext. I
52         return a Deferred that fires (with the consumer) when the read is
53         finished."""
54         self._maybe_create_download_node()
55         return self._node.read(consumer, offset, size)
56
57     def get_segment(self, segnum):
58         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
59         Deferred that fires with (offset,data) when the desired segment is
60         available, and c is an object on which c.cancel() can be called to
61         disavow interest in the segment (after which 'd' will never fire).
62
63         You probably need to know the segment size before calling this,
64         unless you want the first few bytes of the file. If you ask for a
65         segment number which turns out to be too large, the Deferred will
66         errback with BadSegmentNumberError.
67
68         The Deferred fires with the offset of the first byte of the data
69         segment, so that you can call get_segment() before knowing the
70         segment size, and still know which data you received.
71         """
72         self._maybe_create_download_node()
73         return self._node.get_segment(segnum)
74
75     def get_segment_size(self):
76         # return a Deferred that fires with the file's real segment size
77         self._maybe_create_download_node()
78         return self._node.get_segsize()
79
80     def get_storage_index(self):
81         return self._verifycap.storage_index
82     def get_verify_cap(self):
83         return self._verifycap
84     def get_size(self):
85         return self._verifycap.size
86
87     def raise_error(self):
88         pass
89
90     def is_mutable(self):
91         return False
92
93     def check_and_repair(self, monitor, verify=False, add_lease=False):
94         c = Checker(verifycap=self._verifycap,
95                     servers=self._storage_broker.get_connected_servers(),
96                     verify=verify, add_lease=add_lease,
97                     secret_holder=self._secret_holder,
98                     monitor=monitor)
99         d = c.start()
100         d.addCallback(self._maybe_repair, monitor)
101         return d
102
103     def _maybe_repair(self, cr, monitor):
104         crr = CheckAndRepairResults(self._verifycap.storage_index)
105         crr.pre_repair_results = cr
106         if cr.is_healthy():
107             crr.post_repair_results = cr
108             return defer.succeed(crr)
109
110         crr.repair_attempted = True
111         crr.repair_successful = False # until proven successful
112         def _repair_error(f):
113             # as with mutable repair, I'm not sure if I want to pass
114             # through a failure or not. TODO
115             crr.repair_successful = False
116             crr.repair_failure = f
117             return f
118         r = Repairer(self, storage_broker=self._storage_broker,
119                      secret_holder=self._secret_holder,
120                      monitor=monitor)
121         d = r.start()
122         d.addCallbacks(self._gather_repair_results, _repair_error,
123                        callbackArgs=(cr, crr,))
124         return d
125
126     def _gather_repair_results(self, ur, cr, crr):
127         assert IUploadResults.providedBy(ur), ur
128         # clone the cr (check results) to form the basis of the
129         # prr (post-repair results)
130
131         verifycap = self._verifycap
132         servers_responding = set(cr.get_servers_responding())
133         sm = DictOfSets()
134         assert isinstance(cr.get_sharemap(), DictOfSets)
135         for shnum, servers in cr.get_sharemap().items():
136             for server in servers:
137                 sm.add(shnum, server)
138         for shnum, servers in ur.get_sharemap().items():
139             for server in servers:
140                 sm.add(shnum, server)
141                 servers_responding.add(server)
142         servers_responding = sorted(servers_responding)
143
144         good_hosts = len(reduce(set.union, sm.values(), set()))
145         is_healthy = bool(len(sm) >= verifycap.total_shares)
146         is_recoverable = bool(len(sm) >= verifycap.needed_shares)
147
148         count_happiness = servers_of_happiness(sm)
149
150         prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
151                            healthy=is_healthy, recoverable=is_recoverable,
152                            count_happiness=count_happiness,
153                            count_shares_needed=verifycap.needed_shares,
154                            count_shares_expected=verifycap.total_shares,
155                            count_shares_good=len(sm),
156                            count_good_share_hosts=good_hosts,
157                            count_recoverable_versions=int(is_recoverable),
158                            count_unrecoverable_versions=int(not is_recoverable),
159                            servers_responding=list(servers_responding),
160                            sharemap=sm,
161                            count_wrong_shares=0, # no such thing as wrong, for immutable
162                            list_corrupt_shares=cr.get_corrupt_shares(),
163                            count_corrupt_shares=len(cr.get_corrupt_shares()),
164                            list_incompatible_shares=cr.get_incompatible_shares(),
165                            count_incompatible_shares=len(cr.get_incompatible_shares()),
166                            summary="",
167                            report=[],
168                            share_problems=[],
169                            servermap=None)
170         crr.repair_successful = is_healthy
171         crr.post_repair_results = prr
172         return crr
173
174     def check(self, monitor, verify=False, add_lease=False):
175         verifycap = self._verifycap
176         sb = self._storage_broker
177         servers = sb.get_connected_servers()
178         sh = self._secret_holder
179
180         v = Checker(verifycap=verifycap, servers=servers,
181                     verify=verify, add_lease=add_lease, secret_holder=sh,
182                     monitor=monitor)
183         return v.start()
184
185 class DecryptingConsumer:
186     """I sit between a CiphertextDownloader (which acts as a Producer) and
187     the real Consumer, decrypting everything that passes by. The real
188     Consumer sees the real Producer, but the Producer sees us instead of the
189     real consumer."""
190     implements(IConsumer, IDownloadStatusHandlingConsumer)
191
192     def __init__(self, consumer, readkey, offset):
193         self._consumer = consumer
194         self._read_ev = None
195         self._download_status = None
196         # TODO: pycryptopp CTR-mode needs random-access operations: I want
197         # either a=AES(readkey, offset) or better yet both of:
198         #  a=AES(readkey, offset=0)
199         #  a.process(ciphertext, offset=xyz)
200         # For now, we fake it with the existing iv= argument.
201         offset_big = offset // 16
202         offset_small = offset % 16
203         iv = binascii.unhexlify("%032x" % offset_big)
204         self._decryptor = AES(readkey, iv=iv)
205         self._decryptor.process("\x00"*offset_small)
206
207     def set_download_status_read_event(self, read_ev):
208         self._read_ev = read_ev
209     def set_download_status(self, ds):
210         self._download_status = ds
211
212     def registerProducer(self, producer, streaming):
213         # this passes through, so the real consumer can flow-control the real
214         # producer. Therefore we don't need to provide any IPushProducer
215         # methods. We implement all the IConsumer methods as pass-throughs,
216         # and only intercept write() to perform decryption.
217         self._consumer.registerProducer(producer, streaming)
218     def unregisterProducer(self):
219         self._consumer.unregisterProducer()
220     def write(self, ciphertext):
221         started = now()
222         plaintext = self._decryptor.process(ciphertext)
223         if self._read_ev:
224             elapsed = now() - started
225             self._read_ev.update(0, elapsed, 0)
226         if self._download_status:
227             self._download_status.add_misc_event("AES", started, now())
228         self._consumer.write(plaintext)
229
230 class ImmutableFileNode:
231     implements(IImmutableFileNode)
232
233     # I wrap a CiphertextFileNode with a decryption key
234     def __init__(self, filecap, storage_broker, secret_holder, terminator,
235                  history):
236         assert isinstance(filecap, uri.CHKFileURI)
237         verifycap = filecap.get_verify_cap()
238         self._cnode = CiphertextFileNode(verifycap, storage_broker,
239                                          secret_holder, terminator, history)
240         assert isinstance(filecap, uri.CHKFileURI)
241         self.u = filecap
242         self._readkey = filecap.key
243
244     # TODO: I'm not sure about this.. what's the use case for node==node? If
245     # we keep it here, we should also put this on CiphertextFileNode
246     def __hash__(self):
247         return self.u.__hash__()
248     def __eq__(self, other):
249         if isinstance(other, ImmutableFileNode):
250             return self.u.__eq__(other.u)
251         else:
252             return False
253     def __ne__(self, other):
254         if isinstance(other, ImmutableFileNode):
255             return self.u.__eq__(other.u)
256         else:
257             return True
258
259     def read(self, consumer, offset=0, size=None):
260         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
261         d = self._cnode.read(decryptor, offset, size)
262         d.addCallback(lambda dc: consumer)
263         return d
264
265     def raise_error(self):
266         pass
267
268     def get_write_uri(self):
269         return None
270
271     def get_readonly_uri(self):
272         return self.get_uri()
273
274     def get_uri(self):
275         return self.u.to_string()
276     def get_cap(self):
277         return self.u
278     def get_readcap(self):
279         return self.u.get_readonly()
280     def get_verify_cap(self):
281         return self.u.get_verify_cap()
282     def get_repair_cap(self):
283         # CHK files can be repaired with just the verifycap
284         return self.u.get_verify_cap()
285
286     def get_storage_index(self):
287         return self.u.get_storage_index()
288
289     def get_size(self):
290         return self.u.get_size()
291     def get_current_size(self):
292         return defer.succeed(self.get_size())
293
294     def is_mutable(self):
295         return False
296
297     def is_readonly(self):
298         return True
299
300     def is_unknown(self):
301         return False
302
303     def is_allowed_in_immutable_directory(self):
304         return True
305
306     def check_and_repair(self, monitor, verify=False, add_lease=False):
307         return self._cnode.check_and_repair(monitor, verify, add_lease)
308     def check(self, monitor, verify=False, add_lease=False):
309         return self._cnode.check(monitor, verify, add_lease)
310
311     def get_best_readable_version(self):
312         """
313         Return an IReadable of the best version of this file. Since
314         immutable files can have only one version, we just return the
315         current filenode.
316         """
317         return defer.succeed(self)
318
319
320     def download_best_version(self):
321         """
322         Download the best version of this file, returning its contents
323         as a bytestring. Since there is only one version of an immutable
324         file, we download and return the contents of this file.
325         """
326         d = consumer.download_to_data(self)
327         return d
328
329     # for an immutable file, download_to_data (specified in IReadable)
330     # is the same as download_best_version (specified in IFileNode). For
331     # mutable files, the difference is more meaningful, since they can
332     # have multiple versions.
333     download_to_data = download_best_version
334
335
336     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
337     # get_size_of_best_version(IFileNode) are all the same for immutable
338     # files.
339     get_size_of_best_version = get_current_size