]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
Refactor StorageFarmBroker handling of servers
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import binascii
3 import copy
4 import time
5 now = time.time
6 from zope.interface import implements, Interface
7 from twisted.internet import defer
8 from twisted.internet.interfaces import IConsumer
9
10 from allmydata.interfaces import IImmutableFileNode, IUploadResults
11 from allmydata import uri
12 from allmydata.check_results import CheckResults, CheckAndRepairResults
13 from allmydata.util.dictutil import DictOfSets
14 from pycryptopp.cipher.aes import AES
15
16 # local imports
17 from allmydata.immutable.checker import Checker
18 from allmydata.immutable.repairer import Repairer
19 from allmydata.immutable.downloader.node import DownloadNode
20 from allmydata.immutable.downloader.status import DownloadStatus
21
22 class IDownloadStatusHandlingConsumer(Interface):
23     def set_download_status_read_event(read_ev):
24         """Record the DownloadStatus 'read event', to be updated with the
25         time it takes to decrypt each chunk of data."""
26
27 class CiphertextFileNode:
28     def __init__(self, verifycap, storage_broker, secret_holder,
29                  terminator, history):
30         assert isinstance(verifycap, uri.CHKFileVerifierURI)
31         self._verifycap = verifycap
32         self._storage_broker = storage_broker
33         self._secret_holder = secret_holder
34         self._terminator = terminator
35         self._history = history
36         self._download_status = None
37         self._node = None # created lazily, on read()
38
39     def _maybe_create_download_node(self):
40         if not self._download_status:
41             ds = DownloadStatus(self._verifycap.storage_index,
42                                 self._verifycap.size)
43             if self._history:
44                 self._history.add_download(ds)
45             self._download_status = ds
46         if self._node is None:
47             self._node = DownloadNode(self._verifycap, self._storage_broker,
48                                       self._secret_holder,
49                                       self._terminator,
50                                       self._history, self._download_status)
51
52     def read(self, consumer, offset=0, size=None):
53         """I am the main entry point, from which FileNode.read() can get
54         data. I feed the consumer with the desired range of ciphertext. I
55         return a Deferred that fires (with the consumer) when the read is
56         finished."""
57         self._maybe_create_download_node()
58         actual_size = size
59         if actual_size is None:
60             actual_size = self._verifycap.size - offset
61         read_ev = self._download_status.add_read_event(offset, actual_size,
62                                                        now())
63         if IDownloadStatusHandlingConsumer.providedBy(consumer):
64             consumer.set_download_status_read_event(read_ev)
65         return self._node.read(consumer, offset, size, read_ev)
66
67     def get_segment(self, segnum):
68         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
69         Deferred that fires with (offset,data) when the desired segment is
70         available, and c is an object on which c.cancel() can be called to
71         disavow interest in the segment (after which 'd' will never fire).
72
73         You probably need to know the segment size before calling this,
74         unless you want the first few bytes of the file. If you ask for a
75         segment number which turns out to be too large, the Deferred will
76         errback with BadSegmentNumberError.
77
78         The Deferred fires with the offset of the first byte of the data
79         segment, so that you can call get_segment() before knowing the
80         segment size, and still know which data you received.
81         """
82         self._maybe_create_download_node()
83         return self._node.get_segment(segnum)
84
85     def get_segment_size(self):
86         # return a Deferred that fires with the file's real segment size
87         self._maybe_create_download_node()
88         return self._node.get_segsize()
89
90     def get_storage_index(self):
91         return self._verifycap.storage_index
92     def get_verify_cap(self):
93         return self._verifycap
94     def get_size(self):
95         return self._verifycap.size
96
97     def raise_error(self):
98         pass
99
100
101     def check_and_repair(self, monitor, verify=False, add_lease=False):
102         verifycap = self._verifycap
103         storage_index = verifycap.storage_index
104         sb = self._storage_broker
105         servers = sb.get_connected_servers()
106         sh = self._secret_holder
107
108         c = Checker(verifycap=verifycap, servers=servers,
109                     verify=verify, add_lease=add_lease, secret_holder=sh,
110                     monitor=monitor)
111         d = c.start()
112         def _maybe_repair(cr):
113             crr = CheckAndRepairResults(storage_index)
114             crr.pre_repair_results = cr
115             if cr.is_healthy():
116                 crr.post_repair_results = cr
117                 return defer.succeed(crr)
118             else:
119                 crr.repair_attempted = True
120                 crr.repair_successful = False # until proven successful
121                 def _gather_repair_results(ur):
122                     assert IUploadResults.providedBy(ur), ur
123                     # clone the cr (check results) to form the basis of the
124                     # prr (post-repair results)
125                     prr = CheckResults(cr.uri, cr.storage_index)
126                     prr.data = copy.deepcopy(cr.data)
127
128                     sm = prr.data['sharemap']
129                     assert isinstance(sm, DictOfSets), sm
130                     sm.update(ur.sharemap)
131                     servers_responding = set(prr.data['servers-responding'])
132                     servers_responding.union(ur.sharemap.iterkeys())
133                     prr.data['servers-responding'] = list(servers_responding)
134                     prr.data['count-shares-good'] = len(sm)
135                     prr.data['count-good-share-hosts'] = len(sm)
136                     is_healthy = bool(len(sm) >= verifycap.total_shares)
137                     is_recoverable = bool(len(sm) >= verifycap.needed_shares)
138                     prr.set_healthy(is_healthy)
139                     prr.set_recoverable(is_recoverable)
140                     crr.repair_successful = is_healthy
141                     prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
142
143                     crr.post_repair_results = prr
144                     return crr
145                 def _repair_error(f):
146                     # as with mutable repair, I'm not sure if I want to pass
147                     # through a failure or not. TODO
148                     crr.repair_successful = False
149                     crr.repair_failure = f
150                     return f
151                 r = Repairer(self, storage_broker=sb, secret_holder=sh,
152                              monitor=monitor)
153                 d = r.start()
154                 d.addCallbacks(_gather_repair_results, _repair_error)
155                 return d
156
157         d.addCallback(_maybe_repair)
158         return d
159
160     def check(self, monitor, verify=False, add_lease=False):
161         verifycap = self._verifycap
162         sb = self._storage_broker
163         servers = sb.get_connected_servers()
164         sh = self._secret_holder
165
166         v = Checker(verifycap=verifycap, servers=servers,
167                     verify=verify, add_lease=add_lease, secret_holder=sh,
168                     monitor=monitor)
169         return v.start()
170
171 class DecryptingConsumer:
172     """I sit between a CiphertextDownloader (which acts as a Producer) and
173     the real Consumer, decrypting everything that passes by. The real
174     Consumer sees the real Producer, but the Producer sees us instead of the
175     real consumer."""
176     implements(IConsumer, IDownloadStatusHandlingConsumer)
177
178     def __init__(self, consumer, readkey, offset):
179         self._consumer = consumer
180         self._read_event = None
181         # TODO: pycryptopp CTR-mode needs random-access operations: I want
182         # either a=AES(readkey, offset) or better yet both of:
183         #  a=AES(readkey, offset=0)
184         #  a.process(ciphertext, offset=xyz)
185         # For now, we fake it with the existing iv= argument.
186         offset_big = offset // 16
187         offset_small = offset % 16
188         iv = binascii.unhexlify("%032x" % offset_big)
189         self._decryptor = AES(readkey, iv=iv)
190         self._decryptor.process("\x00"*offset_small)
191
192     def set_download_status_read_event(self, read_ev):
193         self._read_event = read_ev
194
195     def registerProducer(self, producer, streaming):
196         # this passes through, so the real consumer can flow-control the real
197         # producer. Therefore we don't need to provide any IPushProducer
198         # methods. We implement all the IConsumer methods as pass-throughs,
199         # and only intercept write() to perform decryption.
200         self._consumer.registerProducer(producer, streaming)
201     def unregisterProducer(self):
202         self._consumer.unregisterProducer()
203     def write(self, ciphertext):
204         started = now()
205         plaintext = self._decryptor.process(ciphertext)
206         if self._read_event:
207             elapsed = now() - started
208             self._read_event.update(0, elapsed, 0)
209         self._consumer.write(plaintext)
210
211 class ImmutableFileNode:
212     implements(IImmutableFileNode)
213
214     # I wrap a CiphertextFileNode with a decryption key
215     def __init__(self, filecap, storage_broker, secret_holder, terminator,
216                  history):
217         assert isinstance(filecap, uri.CHKFileURI)
218         verifycap = filecap.get_verify_cap()
219         self._cnode = CiphertextFileNode(verifycap, storage_broker,
220                                          secret_holder, terminator, history)
221         assert isinstance(filecap, uri.CHKFileURI)
222         self.u = filecap
223         self._readkey = filecap.key
224
225     # TODO: I'm not sure about this.. what's the use case for node==node? If
226     # we keep it here, we should also put this on CiphertextFileNode
227     def __hash__(self):
228         return self.u.__hash__()
229     def __eq__(self, other):
230         if isinstance(other, ImmutableFileNode):
231             return self.u.__eq__(other.u)
232         else:
233             return False
234     def __ne__(self, other):
235         if isinstance(other, ImmutableFileNode):
236             return self.u.__eq__(other.u)
237         else:
238             return True
239
240     def read(self, consumer, offset=0, size=None):
241         decryptor = DecryptingConsumer(consumer, self._readkey, offset)
242         d = self._cnode.read(decryptor, offset, size)
243         d.addCallback(lambda dc: consumer)
244         return d
245
246     def raise_error(self):
247         pass
248
249     def get_write_uri(self):
250         return None
251
252     def get_readonly_uri(self):
253         return self.get_uri()
254
255     def get_uri(self):
256         return self.u.to_string()
257     def get_cap(self):
258         return self.u
259     def get_readcap(self):
260         return self.u.get_readonly()
261     def get_verify_cap(self):
262         return self.u.get_verify_cap()
263     def get_repair_cap(self):
264         # CHK files can be repaired with just the verifycap
265         return self.u.get_verify_cap()
266
267     def get_storage_index(self):
268         return self.u.get_storage_index()
269
270     def get_size(self):
271         return self.u.get_size()
272     def get_current_size(self):
273         return defer.succeed(self.get_size())
274
275     def is_mutable(self):
276         return False
277
278     def is_readonly(self):
279         return True
280
281     def is_unknown(self):
282         return False
283
284     def is_allowed_in_immutable_directory(self):
285         return True
286
287     def check_and_repair(self, monitor, verify=False, add_lease=False):
288         return self._cnode.check_and_repair(monitor, verify, add_lease)
289     def check(self, monitor, verify=False, add_lease=False):
290         return self._cnode.check(monitor, verify, add_lease)