]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
lazily create DownloadNode upon first read()/get_segment()
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import copy
4 import time
5 now = time.time
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.internet.interfaces import IConsumer
9
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata import uri
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
15
16 # local imports
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode
20 from allmydata.immutable.downloader.status import DownloadStatus
21
22 class CiphertextFileNode:
23     def __init__(self, verifycap, storage_broker, secret_holder,
24                  terminator, history, download_status=None):
25         assert isinstance(verifycap, uri.CHKFileVerifierURI)
26         self._verifycap = verifycap
27         self._storage_broker = storage_broker
28         self._secret_holder = secret_holder
29         if download_status is None:
30             ds = DownloadStatus(verifycap.storage_index, verifycap.size)
31             if history:
32                 history.add_download(ds)
33             download_status = ds
34         self._terminator = terminator
35         self._history = history
36         self._download_status = download_status
37         self._node = None # created lazily, on read()
38
39     def _maybe_create_download_node(self):
40         if self._node is None:
41             self._node = DownloadNode(self._verifycap, self._storage_broker,
42                                       self._secret_holder,
43                                       self._terminator,
44                                       self._history, self._download_status)
45
46     def read(self, consumer, offset=0, size=None, read_ev=None):
47         """I am the main entry point, from which FileNode.read() can get
48         data. I feed the consumer with the desired range of ciphertext. I
49         return a Deferred that fires (with the consumer) when the read is
50         finished."""
51         self._maybe_create_download_node()
52         return self._node.read(consumer, offset, size, read_ev)
53
54     def get_segment(self, segnum):
55         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
56         Deferred that fires with (offset,data) when the desired segment is
57         available, and c is an object on which c.cancel() can be called to
58         disavow interest in the segment (after which 'd' will never fire).
59
60         You probably need to know the segment size before calling this,
61         unless you want the first few bytes of the file. If you ask for a
62         segment number which turns out to be too large, the Deferred will
63         errback with BadSegmentNumberError.
64
65         The Deferred fires with the offset of the first byte of the data
66         segment, so that you can call get_segment() before knowing the
67         segment size, and still know which data you received.
68         """
69         self._maybe_create_download_node()
70         return self._node.get_segment(segnum)
71
72     def get_segment_size(self):
73         # return a Deferred that fires with the file's real segment size
74         self._maybe_create_download_node()
75         return self._node.get_segsize()
76
77     def get_storage_index(self):
78         return self._verifycap.storage_index
79     def get_verify_cap(self):
80         return self._verifycap
81     def get_size(self):
82         return self._verifycap.size
83
84     def raise_error(self):
85         pass
86
87
88     def check_and_repair(self, monitor, verify=False, add_lease=False):
89         verifycap = self._verifycap
90         storage_index = verifycap.storage_index
91         sb = self._storage_broker
92         servers = sb.get_all_servers()
93         sh = self._secret_holder
94
95         c = Checker(verifycap=verifycap, servers=servers,
96                     verify=verify, add_lease=add_lease, secret_holder=sh,
97                     monitor=monitor)
98         d = c.start()
99         def _maybe_repair(cr):
100             crr = CheckAndRepairResults(storage_index)
101             crr.pre_repair_results = cr
102             if cr.is_healthy():
103                 crr.post_repair_results = cr
104                 return defer.succeed(crr)
105             else:
106                 crr.repair_attempted = True
107                 crr.repair_successful = False # until proven successful
108                 def _gather_repair_results(ur):
109                     assert IUploadResults.providedBy(ur), ur
110                     # clone the cr (check results) to form the basis of the
111                     # prr (post-repair results)
112                     prr = CheckResults(cr.uri, cr.storage_index)
113                     prr.data = copy.deepcopy(cr.data)
114
115                     sm = prr.data['sharemap']
116                     assert isinstance(sm, DictOfSets), sm
117                     sm.update(ur.sharemap)
118                     servers_responding = set(prr.data['servers-responding'])
119                     servers_responding.union(ur.sharemap.iterkeys())
120                     prr.data['servers-responding'] = list(servers_responding)
121                     prr.data['count-shares-good'] = len(sm)
122                     prr.data['count-good-share-hosts'] = len(sm)
123                     is_healthy = bool(len(sm) >= verifycap.total_shares)
124                     is_recoverable = bool(len(sm) >= verifycap.needed_shares)
125                     prr.set_healthy(is_healthy)
126                     prr.set_recoverable(is_recoverable)
127                     crr.repair_successful = is_healthy
128                     prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
129
130                     crr.post_repair_results = prr
131                     return crr
132                 def _repair_error(f):
133                     # as with mutable repair, I'm not sure if I want to pass
134                     # through a failure or not. TODO
135                     crr.repair_successful = False
136                     crr.repair_failure = f
137                     return f
138                 r = Repairer(self, storage_broker=sb, secret_holder=sh,
139                              monitor=monitor)
140                 d = r.start()
141                 d.addCallbacks(_gather_repair_results, _repair_error)
142                 return d
143
144         d.addCallback(_maybe_repair)
145         return d
146
147     def check(self, monitor, verify=False, add_lease=False):
148         verifycap = self._verifycap
149         sb = self._storage_broker
150         servers = sb.get_all_servers()
151         sh = self._secret_holder
152
153         v = Checker(verifycap=verifycap, servers=servers,
154                     verify=verify, add_lease=add_lease, secret_holder=sh,
155                     monitor=monitor)
156         return v.start()
157
158
159 class DecryptingConsumer:
160     """I sit between a CiphertextDownloader (which acts as a Producer) and
161     the real Consumer, decrypting everything that passes by. The real
162     Consumer sees the real Producer, but the Producer sees us instead of the
163     real consumer."""
164     implements(IConsumer)
165
166     def __init__(self, consumer, readkey, offset, read_event):
167         self._consumer = consumer
168         self._read_event = read_event
169         # TODO: pycryptopp CTR-mode needs random-access operations: I want
170         # either a=AES(readkey, offset) or better yet both of:
171         #  a=AES(readkey, offset=0)
172         #  a.process(ciphertext, offset=xyz)
173         # For now, we fake it with the existing iv= argument.
174         offset_big = offset // 16
175         offset_small = offset % 16
176         iv = binascii.unhexlify("%032x" % offset_big)
177         self._decryptor = AES(readkey, iv=iv)
178         self._decryptor.process("\x00"*offset_small)
179
180     def registerProducer(self, producer, streaming):
181         # this passes through, so the real consumer can flow-control the real
182         # producer. Therefore we don't need to provide any IPushProducer
183         # methods. We implement all the IConsumer methods as pass-throughs,
184         # and only intercept write() to perform decryption.
185         self._consumer.registerProducer(producer, streaming)
186     def unregisterProducer(self):
187         self._consumer.unregisterProducer()
188     def write(self, ciphertext):
189         started = now()
190         plaintext = self._decryptor.process(ciphertext)
191         elapsed = now() - started
192         self._read_event.update(0, elapsed, 0)
193         self._consumer.write(plaintext)
194
195 class ImmutableFileNode:
196     implements(IImmutableFileNode)
197
198     # I wrap a CiphertextFileNode with a decryption key
199     def __init__(self, filecap, storage_broker, secret_holder, terminator,
200                  history):
201         assert isinstance(filecap, uri.CHKFileURI)
202         verifycap = filecap.get_verify_cap()
203         ds = DownloadStatus(verifycap.storage_index, verifycap.size)
204         if history:
205             history.add_download(ds)
206         self._download_status = ds
207         self._cnode = CiphertextFileNode(verifycap, storage_broker,
208                                          secret_holder, terminator, history, ds)
209         assert isinstance(filecap, uri.CHKFileURI)
210         self.u = filecap
211         self._readkey = filecap.key
212
213     # TODO: I'm not sure about this.. what's the use case for node==node? If
214     # we keep it here, we should also put this on CiphertextFileNode
215     def __hash__(self):
216         return self.u.__hash__()
217     def __eq__(self, other):
218         if isinstance(other, ImmutableFileNode):
219             return self.u.__eq__(other.u)
220         else:
221             return False
222     def __ne__(self, other):
223         if isinstance(other, ImmutableFileNode):
224             return self.u.__eq__(other.u)
225         else:
226             return True
227
228     def read(self, consumer, offset=0, size=None):
229         actual_size = size
230         if actual_size == None:
231             actual_size = self.u.size
232         actual_size = actual_size - offset
233         read_ev = self._download_status.add_read_event(offset,actual_size,
234                                                        now())
235         decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev)
236         d = self._cnode.read(decryptor, offset, size, read_ev)
237         d.addCallback(lambda dc: consumer)
238         return d
239
240     def raise_error(self):
241         pass
242
243     def get_write_uri(self):
244         return None
245
246     def get_readonly_uri(self):
247         return self.get_uri()
248
249     def get_uri(self):
250         return self.u.to_string()
251     def get_cap(self):
252         return self.u
253     def get_readcap(self):
254         return self.u.get_readonly()
255     def get_verify_cap(self):
256         return self.u.get_verify_cap()
257     def get_repair_cap(self):
258         # CHK files can be repaired with just the verifycap
259         return self.u.get_verify_cap()
260
261     def get_storage_index(self):
262         return self.u.get_storage_index()
263
264     def get_size(self):
265         return self.u.get_size()
266     def get_current_size(self):
267         return defer.succeed(self.get_size())
268
269     def is_mutable(self):
270         return False
271
272     def is_readonly(self):
273         return True
274
275     def is_unknown(self):
276         return False
277
278     def is_allowed_in_immutable_directory(self):
279         return True
280
281     def check_and_repair(self, monitor, verify=False, add_lease=False):
282         return self._cnode.check_and_repair(monitor, verify, add_lease)
283     def check(self, monitor, verify=False, add_lease=False):
284         return self._cnode.check(monitor, verify, add_lease)