]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
0ac47a6c2750c9708c6b733b717cc3815b062f90
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import copy
4 import time
5 now = time.time
6 from zope.interface import implements
7 from twisted.internet import defer
8
9 from allmydata import uri
10 from twisted.internet.interfaces import IConsumer
11 from twisted.protocols import basic
12 from foolscap.api import eventually
13 from allmydata.interfaces import IImmutableFileNode, ICheckable, \
14      IDownloadTarget, IUploadResults
15 from allmydata.util import dictutil, log, base32, consumer
16 from allmydata.immutable.checker import Checker
17 from allmydata.check_results import CheckResults, CheckAndRepairResults
18 from allmydata.util.dictutil import DictOfSets
19 from pycryptopp.cipher.aes import AES
20
21 # local imports
22 from allmydata.immutable.checker import Checker
23 from allmydata.immutable.repairer import Repairer
24 from allmydata.immutable.downloader.node import DownloadNode, \
25      IDownloadStatusHandlingConsumer
26 from allmydata.immutable.downloader.status import DownloadStatus
27
28 class CiphertextFileNode:
29     def __init__(self, verifycap, storage_broker, secret_holder,
30                  terminator, history):
31         assert isinstance(verifycap, uri.CHKFileVerifierURI)
32         self._verifycap = verifycap
33         self._storage_broker = storage_broker
34         self._secret_holder = secret_holder
35         self._terminator = terminator
36         self._history = history
37         self._download_status = None
38         self._node = None # created lazily, on read()
39
40     def _maybe_create_download_node(self):
41         if not self._download_status:
42             ds = DownloadStatus(self._verifycap.storage_index,
43                                 self._verifycap.size)
44             if self._history:
45                 self._history.add_download(ds)
46             self._download_status = ds
47         if self._node is None:
48             self._node = DownloadNode(self._verifycap, self._storage_broker,
49                                       self._secret_holder,
50                                       self._terminator,
51                                       self._history, self._download_status)
52
53     def read(self, consumer, offset=0, size=None):
54         """I am the main entry point, from which FileNode.read() can get
55         data. I feed the consumer with the desired range of ciphertext. I
56         return a Deferred that fires (with the consumer) when the read is
57         finished."""
58         self._maybe_create_download_node()
59         return self._node.read(consumer, offset, size)
60
61     def get_segment(self, segnum):
62         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
63         Deferred that fires with (offset,data) when the desired segment is
64         available, and c is an object on which c.cancel() can be called to
65         disavow interest in the segment (after which 'd' will never fire).
66
67         You probably need to know the segment size before calling this,
68         unless you want the first few bytes of the file. If you ask for a
69         segment number which turns out to be too large, the Deferred will
70         errback with BadSegmentNumberError.
71
72         The Deferred fires with the offset of the first byte of the data
73         segment, so that you can call get_segment() before knowing the
74         segment size, and still know which data you received.
75         """
76         self._maybe_create_download_node()
77         return self._node.get_segment(segnum)
78
79     def get_segment_size(self):
80         # return a Deferred that fires with the file's real segment size
81         self._maybe_create_download_node()
82         return self._node.get_segsize()
83
84     def get_storage_index(self):
85         return self._verifycap.storage_index
86     def get_verify_cap(self):
87         return self._verifycap
88     def get_size(self):
89         return self._verifycap.size
90
91     def raise_error(self):
92         pass
93
94
95     def check_and_repair(self, monitor, verify=False, add_lease=False):
96         verifycap = self._verifycap
97         storage_index = verifycap.storage_index
98         sb = self._storage_broker
99         servers = sb.get_connected_servers()
100         sh = self._secret_holder
101
102         c = Checker(verifycap=verifycap, servers=servers,
103                     verify=verify, add_lease=add_lease, secret_holder=sh,
104                     monitor=monitor)
105         d = c.start()
106         def _maybe_repair(cr):
107             crr = CheckAndRepairResults(storage_index)
108             crr.pre_repair_results = cr
109             if cr.is_healthy():
110                 crr.post_repair_results = cr
111                 return defer.succeed(crr)
112             else:
113                 crr.repair_attempted = True
114                 crr.repair_successful = False # until proven successful
115                 def _gather_repair_results(ur):
116                     assert IUploadResults.providedBy(ur), ur
117                     # clone the cr (check results) to form the basis of the
118                     # prr (post-repair results)
119                     prr = CheckResults(cr.uri, cr.storage_index)
120                     prr.data = copy.deepcopy(cr.data)
121
122                     sm = prr.data['sharemap']
123                     assert isinstance(sm, DictOfSets), sm
124                     sm.update(ur.sharemap)
125                     servers_responding = set(prr.data['servers-responding'])
126                     servers_responding.union(ur.sharemap.iterkeys())
127                     prr.data['servers-responding'] = list(servers_responding)
128                     prr.data['count-shares-good'] = len(sm)
129                     prr.data['count-good-share-hosts'] = len(sm)
130                     is_healthy = bool(len(sm) >= verifycap.total_shares)
131                     is_recoverable = bool(len(sm) >= verifycap.needed_shares)
132                     prr.set_healthy(is_healthy)
133                     prr.set_recoverable(is_recoverable)
134                     crr.repair_successful = is_healthy
135                     prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
136
137                     crr.post_repair_results = prr
138                     return crr
139                 def _repair_error(f):
140                     # as with mutable repair, I'm not sure if I want to pass
141                     # through a failure or not. TODO
142                     crr.repair_successful = False
143                     crr.repair_failure = f
144                     return f
145                 r = Repairer(self, storage_broker=sb, secret_holder=sh,
146                              monitor=monitor)
147                 d = r.start()
148                 d.addCallbacks(_gather_repair_results, _repair_error)
149                 return d
150
151         d.addCallback(_maybe_repair)
152         return d
153
154     def check(self, monitor, verify=False, add_lease=False):
155         verifycap = self._verifycap
156         sb = self._storage_broker
157         servers = sb.get_connected_servers()
158         sh = self._secret_holder
159
160         v = Checker(verifycap=verifycap, servers=servers,
161                     verify=verify, add_lease=add_lease, secret_holder=sh,
162                     monitor=monitor)
163         return v.start()
164
165 class DecryptingConsumer:
166     """I sit between a CiphertextDownloader (which acts as a Producer) and
167     the real Consumer, decrypting everything that passes by. The real
168     Consumer sees the real Producer, but the Producer sees us instead of the
169     real consumer."""
170     implements(IConsumer, IDownloadStatusHandlingConsumer)
171
172     def __init__(self, consumer, readkey, offset):
173         self._consumer = consumer
174         self._read_ev = None
175         # TODO: pycryptopp CTR-mode needs random-access operations: I want
176         # either a=AES(readkey, offset) or better yet both of:
177         #  a=AES(readkey, offset=0)
178         #  a.process(ciphertext, offset=xyz)
179         # For now, we fake it with the existing iv= argument.
180         offset_big = offset // 16
181         offset_small = offset % 16
182         iv = binascii.unhexlify("%032x" % offset_big)
183         self._decryptor = AES(readkey, iv=iv)
184         self._decryptor.process("\x00"*offset_small)
185
186     def set_download_status_read_event(self, read_ev):
187         self._read_ev = read_ev
188
189     def registerProducer(self, producer, streaming):
190         # this passes through, so the real consumer can flow-control the real
191         # producer. Therefore we don't need to provide any IPushProducer
192         # methods. We implement all the IConsumer methods as pass-throughs,
193         # and only intercept write() to perform decryption.
194         self._consumer.registerProducer(producer, streaming)
195     def unregisterProducer(self):
196         self._consumer.unregisterProducer()
197     def write(self, ciphertext):
198         started = now()
199         plaintext = self._decryptor.process(ciphertext)
200         if self._read_ev:
201             elapsed = now() - started
202             self._read_ev.update(0, elapsed, 0)
203         self._consumer.write(plaintext)
204
205 class ImmutableFileNode:
206     implements(IImmutableFileNode)
207
208     # I wrap a CiphertextFileNode with a decryption key
209     def __init__(self, filecap, storage_broker, secret_holder, terminator,
210                  history):
211         assert isinstance(filecap, uri.CHKFileURI)
212         verifycap = filecap.get_verify_cap()
213         self._cnode = CiphertextFileNode(verifycap, storage_broker,
214                                          secret_holder, terminator, history)
215         assert isinstance(filecap, uri.CHKFileURI)
216         self.u = filecap
217         self._readkey = filecap.key
218
219     # TODO: I'm not sure about this.. what's the use case for node==node? If
220     # we keep it here, we should also put this on CiphertextFileNode
221     def __hash__(self):
222         return self.u.__hash__()
223     def __eq__(self, other):
224         if isinstance(other, ImmutableFileNode):
225             return self.u.__eq__(other.u)
226         else:
227             return False
228     def __ne__(self, other):
229         if isinstance(other, ImmutableFileNode):
230             return self.u.__eq__(other.u)
231         else:
232             return True
233
234     def read(self, consumer, offset=0, size=None):
235         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
236         d = self._cnode.read(decryptor, offset, size)
237         d.addCallback(lambda dc: consumer)
238         return d
239
240     def raise_error(self):
241         pass
242
243     def get_write_uri(self):
244         return None
245
246     def get_readonly_uri(self):
247         return self.get_uri()
248
249     def get_uri(self):
250         return self.u.to_string()
251     def get_cap(self):
252         return self.u
253     def get_readcap(self):
254         return self.u.get_readonly()
255     def get_verify_cap(self):
256         return self.u.get_verify_cap()
257     def get_repair_cap(self):
258         # CHK files can be repaired with just the verifycap
259         return self.u.get_verify_cap()
260
261     def get_storage_index(self):
262         return self.u.get_storage_index()
263
264     def get_size(self):
265         return self.u.get_size()
266     def get_current_size(self):
267         return defer.succeed(self.get_size())
268
269     def is_mutable(self):
270         return False
271
272     def is_readonly(self):
273         return True
274
275     def is_unknown(self):
276         return False
277
278     def is_allowed_in_immutable_directory(self):
279         return True
280
281     def check_and_repair(self, monitor, verify=False, add_lease=False):
282         return self._cnode.check_and_repair(monitor, verify, add_lease)
283     def check(self, monitor, verify=False, add_lease=False):
284         return self._cnode.check(monitor, verify, add_lease)
285
286     def get_best_readable_version(self):
287         """
288         Return an IReadable of the best version of this file. Since
289         immutable files can have only one version, we just return the
290         current filenode.
291         """
292         return defer.succeed(self)
293
294
295     def download_best_version(self):
296         """
297         Download the best version of this file, returning its contents
298         as a bytestring. Since there is only one version of an immutable
299         file, we download and return the contents of this file.
300         """
301         d = consumer.download_to_data(self)
302         return d
303
304     # for an immutable file, download_to_data (specified in IReadable)
305     # is the same as download_best_version (specified in IFileNode). For
306     # mutable files, the difference is more meaningful, since they can
307     # have multiple versions.
308     download_to_data = download_best_version
309
310
311     # get_size() (IReadable), get_current_size() (IFilesystemNode), and
312     # get_size_of_best_version(IFileNode) are all the same for immutable
313     # files.
314     get_size_of_best_version = get_current_size