]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
change UploadResults to return IServers, update users to match
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import copy
4 import time
5 now = time.time
6 from zope.interface import implements
7 from twisted.internet import defer
8
9 from allmydata import uri
10 from twisted.internet.interfaces import IConsumer
11 from allmydata.interfaces import IImmutableFileNode, IUploadResults
12 from allmydata.util import consumer
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.util.dictutil import DictOfSets
15 from pycryptopp.cipher.aes import AES
16
17 # local imports
18 from allmydata.immutable.checker import Checker
19 from allmydata.immutable.repairer import Repairer
20 from allmydata.immutable.downloader.node import DownloadNode, \
21      IDownloadStatusHandlingConsumer
22 from allmydata.immutable.downloader.status import DownloadStatus
23
24 class CiphertextFileNode:
25     def __init__(self, verifycap, storage_broker, secret_holder,
26                  terminator, history):
27         assert isinstance(verifycap, uri.CHKFileVerifierURI)
28         self._verifycap = verifycap
29         self._storage_broker = storage_broker
30         self._secret_holder = secret_holder
31         self._terminator = terminator
32         self._history = history
33         self._download_status = None
34         self._node = None # created lazily, on read()
35
36     def _maybe_create_download_node(self):
37         if not self._download_status:
38             ds = DownloadStatus(self._verifycap.storage_index,
39                                 self._verifycap.size)
40             if self._history:
41                 self._history.add_download(ds)
42             self._download_status = ds
43         if self._node is None:
44             self._node = DownloadNode(self._verifycap, self._storage_broker,
45                                       self._secret_holder,
46                                       self._terminator,
47                                       self._history, self._download_status)
48
49     def read(self, consumer, offset=0, size=None):
50         """I am the main entry point, from which FileNode.read() can get
51         data. I feed the consumer with the desired range of ciphertext. I
52         return a Deferred that fires (with the consumer) when the read is
53         finished."""
54         self._maybe_create_download_node()
55         return self._node.read(consumer, offset, size)
56
57     def get_segment(self, segnum):
58         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
59         Deferred that fires with (offset,data) when the desired segment is
60         available, and c is an object on which c.cancel() can be called to
61         disavow interest in the segment (after which 'd' will never fire).
62
63         You probably need to know the segment size before calling this,
64         unless you want the first few bytes of the file. If you ask for a
65         segment number which turns out to be too large, the Deferred will
66         errback with BadSegmentNumberError.
67
68         The Deferred fires with the offset of the first byte of the data
69         segment, so that you can call get_segment() before knowing the
70         segment size, and still know which data you received.
71         """
72         self._maybe_create_download_node()
73         return self._node.get_segment(segnum)
74
75     def get_segment_size(self):
76         # return a Deferred that fires with the file's real segment size
77         self._maybe_create_download_node()
78         return self._node.get_segsize()
79
80     def get_storage_index(self):
81         return self._verifycap.storage_index
82     def get_verify_cap(self):
83         return self._verifycap
84     def get_size(self):
85         return self._verifycap.size
86
87     def raise_error(self):
88         pass
89
90
91     def check_and_repair(self, monitor, verify=False, add_lease=False):
92         verifycap = self._verifycap
93         storage_index = verifycap.storage_index
94         sb = self._storage_broker
95         servers = sb.get_connected_servers()
96         sh = self._secret_holder
97
98         c = Checker(verifycap=verifycap, servers=servers,
99                     verify=verify, add_lease=add_lease, secret_holder=sh,
100                     monitor=monitor)
101         d = c.start()
102         def _maybe_repair(cr):
103             crr = CheckAndRepairResults(storage_index)
104             crr.pre_repair_results = cr
105             if cr.is_healthy():
106                 crr.post_repair_results = cr
107                 return defer.succeed(crr)
108             else:
109                 crr.repair_attempted = True
110                 crr.repair_successful = False # until proven successful
111                 def _gather_repair_results(ur):
112                     assert IUploadResults.providedBy(ur), ur
113                     # clone the cr (check results) to form the basis of the
114                     # prr (post-repair results)
115                     prr = CheckResults(cr.uri, cr.storage_index)
116                     prr.data = copy.deepcopy(cr.data)
117
118                     servers_responding = set(prr.data['servers-responding'])
119                     sm = prr.data['sharemap']
120                     assert isinstance(sm, DictOfSets), sm
121                     for shnum, servers in ur.get_sharemap().items():
122                         for s in servers:
123                             sm.add(shnum, s.get_serverid())
124                             servers_responding.add(s.get_serverid())
125                     servers_responding = sorted(servers_responding)
126                     prr.data['servers-responding'] = servers_responding
127                     prr.data['count-shares-good'] = len(sm)
128                     good_hosts = len(reduce(set.union, sm.itervalues(), set()))
129                     prr.data['count-good-share-hosts'] = good_hosts
130                     is_healthy = bool(len(sm) >= verifycap.total_shares)
131                     is_recoverable = bool(len(sm) >= verifycap.needed_shares)
132                     prr.set_healthy(is_healthy)
133                     prr.set_recoverable(is_recoverable)
134                     crr.repair_successful = is_healthy
135                     prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
136
137                     crr.post_repair_results = prr
138                     return crr
139                 def _repair_error(f):
140                     # as with mutable repair, I'm not sure if I want to pass
141                     # through a failure or not. TODO
142                     crr.repair_successful = False
143                     crr.repair_failure = f
144                     return f
145                 r = Repairer(self, storage_broker=sb, secret_holder=sh,
146                              monitor=monitor)
147                 d = r.start()
148                 d.addCallbacks(_gather_repair_results, _repair_error)
149                 return d
150
151         d.addCallback(_maybe_repair)
152         return d
153
154     def check(self, monitor, verify=False, add_lease=False):
155         verifycap = self._verifycap
156         sb = self._storage_broker
157         servers = sb.get_connected_servers()
158         sh = self._secret_holder
159
160         v = Checker(verifycap=verifycap, servers=servers,
161                     verify=verify, add_lease=add_lease, secret_holder=sh,
162                     monitor=monitor)
163         return v.start()
164
165 class DecryptingConsumer:
166     """I sit between a CiphertextDownloader (which acts as a Producer) and
167     the real Consumer, decrypting everything that passes by. The real
168     Consumer sees the real Producer, but the Producer sees us instead of the
169     real consumer."""
170     implements(IConsumer, IDownloadStatusHandlingConsumer)
171
172     def __init__(self, consumer, readkey, offset):
173         self._consumer = consumer
174         self._read_ev = None
175         self._download_status = None
176         # TODO: pycryptopp CTR-mode needs random-access operations: I want
177         # either a=AES(readkey, offset) or better yet both of:
178         #  a=AES(readkey, offset=0)
179         #  a.process(ciphertext, offset=xyz)
180         # For now, we fake it with the existing iv= argument.
181         offset_big = offset // 16
182         offset_small = offset % 16
183         iv = binascii.unhexlify("%032x" % offset_big)
184         self._decryptor = AES(readkey, iv=iv)
185         self._decryptor.process("\x00"*offset_small)
186
187     def set_download_status_read_event(self, read_ev):
188         self._read_ev = read_ev
189     def set_download_status(self, ds):
190         self._download_status = ds
191
192     def registerProducer(self, producer, streaming):
193         # this passes through, so the real consumer can flow-control the real
194         # producer. Therefore we don't need to provide any IPushProducer
195         # methods. We implement all the IConsumer methods as pass-throughs,
196         # and only intercept write() to perform decryption.
197         self._consumer.registerProducer(producer, streaming)
198     def unregisterProducer(self):
199         self._consumer.unregisterProducer()
200     def write(self, ciphertext):
201         started = now()
202         plaintext = self._decryptor.process(ciphertext)
203         if self._read_ev:
204             elapsed = now() - started
205             self._read_ev.update(0, elapsed, 0)
206         if self._download_status:
207             self._download_status.add_misc_event("AES", started, now())
208         self._consumer.write(plaintext)
209
210 class ImmutableFileNode:
211     implements(IImmutableFileNode)
212
213     # I wrap a CiphertextFileNode with a decryption key
214     def __init__(self, filecap, storage_broker, secret_holder, terminator,
215                  history):
216         assert isinstance(filecap, uri.CHKFileURI)
217         verifycap = filecap.get_verify_cap()
218         self._cnode = CiphertextFileNode(verifycap, storage_broker,
219                                          secret_holder, terminator, history)
220         assert isinstance(filecap, uri.CHKFileURI)
221         self.u = filecap
222         self._readkey = filecap.key
223
224     # TODO: I'm not sure about this.. what's the use case for node==node? If
225     # we keep it here, we should also put this on CiphertextFileNode
226     def __hash__(self):
227         return self.u.__hash__()
228     def __eq__(self, other):
229         if isinstance(other, ImmutableFileNode):
230             return self.u.__eq__(other.u)
231         else:
232             return False
233     def __ne__(self, other):
234         if isinstance(other, ImmutableFileNode):
235             return self.u.__eq__(other.u)
236         else:
237             return True
238
239     def read(self, consumer, offset=0, size=None):
240         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
241         d = self._cnode.read(decryptor, offset, size)
242         d.addCallback(lambda dc: consumer)
243         return d
244
245     def raise_error(self):
246         pass
247
248     def get_write_uri(self):
249         return None
250
251     def get_readonly_uri(self):
252         return self.get_uri()
253
254     def get_uri(self):
255         return self.u.to_string()
256     def get_cap(self):
257         return self.u
258     def get_readcap(self):
259         return self.u.get_readonly()
260     def get_verify_cap(self):
261         return self.u.get_verify_cap()
262     def get_repair_cap(self):
263         # CHK files can be repaired with just the verifycap
264         return self.u.get_verify_cap()
265
266     def get_storage_index(self):
267         return self.u.get_storage_index()
268
269     def get_size(self):
270         return self.u.get_size()
271     def get_current_size(self):
272         return defer.succeed(self.get_size())
273
274     def is_mutable(self):
275         return False
276
277     def is_readonly(self):
278         return True
279
280     def is_unknown(self):
281         return False
282
283     def is_allowed_in_immutable_directory(self):
284         return True
285
286     def check_and_repair(self, monitor, verify=False, add_lease=False):
287         return self._cnode.check_and_repair(monitor, verify, add_lease)
288     def check(self, monitor, verify=False, add_lease=False):
289         return self._cnode.check(monitor, verify, add_lease)
290
291     def get_best_readable_version(self):
292         """
293         Return an IReadable of the best version of this file. Since
294         immutable files can have only one version, we just return the
295         current filenode.
296         """
297         return defer.succeed(self)
298
299
300     def download_best_version(self):
301         """
302         Download the best version of this file, returning its contents
303         as a bytestring. Since there is only one version of an immutable
304         file, we download and return the contents of this file.
305         """
306         d = consumer.download_to_data(self)
307         return d
308
309     # for an immutable file, download_to_data (specified in IReadable)
310     # is the same as download_best_version (specified in IFileNode). For
311     # mutable files, the difference is more meaningful, since they can
312     # have multiple versions.
313     download_to_data = download_best_version
314
315
316     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
317     # get_size_of_best_version(IFileNode) are all the same for immutable
318     # files.
319     get_size_of_best_version = get_current_size