]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
logging: add 'unique-message-ids' (or 'umids') to each WEIRD-or-higher log.msg call...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import struct, time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap import DeadReferenceError
8 from foolscap.eventual import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus
10 from allmydata.util import hashutil, idlib, log
11 from allmydata import hashtree, codec, storage
12 from allmydata.immutable.encode import NotEnoughSharesError
13 from pycryptopp.cipher.aes import AES
14
15 from common import DictOfSets, CorruptShareError, UncoordinatedWriteError
16 from layout import SIGNED_PREFIX, unpack_share_data
17
18 class RetrieveStatus:
19     implements(IRetrieveStatus)
20     statusid_counter = count(0)
21     def __init__(self):
22         self.timings = {}
23         self.timings["fetch_per_server"] = {}
24         self.timings["cumulative_verify"] = 0.0
25         self.problems = {}
26         self.active = True
27         self.storage_index = None
28         self.helper = False
29         self.encoding = ("?","?")
30         self.size = None
31         self.status = "Not started"
32         self.progress = 0.0
33         self.counter = self.statusid_counter.next()
34         self.started = time.time()
35
36     def get_started(self):
37         return self.started
38     def get_storage_index(self):
39         return self.storage_index
40     def get_encoding(self):
41         return self.encoding
42     def using_helper(self):
43         return self.helper
44     def get_size(self):
45         return self.size
46     def get_status(self):
47         return self.status
48     def get_progress(self):
49         return self.progress
50     def get_active(self):
51         return self.active
52     def get_counter(self):
53         return self.counter
54
55     def add_fetch_timing(self, peerid, elapsed):
56         if peerid not in self.timings["fetch_per_server"]:
57             self.timings["fetch_per_server"][peerid] = []
58         self.timings["fetch_per_server"][peerid].append(elapsed)
59     def set_storage_index(self, si):
60         self.storage_index = si
61     def set_helper(self, helper):
62         self.helper = helper
63     def set_encoding(self, k, n):
64         self.encoding = (k, n)
65     def set_size(self, size):
66         self.size = size
67     def set_status(self, status):
68         self.status = status
69     def set_progress(self, value):
70         self.progress = value
71     def set_active(self, value):
72         self.active = value
73
74 class Marker:
75     pass
76
77 class Retrieve:
78     # this class is currently single-use. Eventually (in MDMF) we will make
79     # it multi-use, in which case you can call download(range) multiple
80     # times, and each will have a separate response chain. However the
81     # Retrieve object will remain tied to a specific version of the file, and
82     # will use a single ServerMap instance.
83
84     def __init__(self, filenode, servermap, verinfo):
85         self._node = filenode
86         assert self._node._pubkey
87         self._storage_index = filenode.get_storage_index()
88         assert self._node._readkey
89         self._last_failure = None
90         prefix = storage.si_b2a(self._storage_index)[:5]
91         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
92         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
93         self._running = True
94         self._decoding = False
95         self._bad_shares = set()
96
97         self.servermap = servermap
98         assert self._node._pubkey
99         self.verinfo = verinfo
100
101         self._status = RetrieveStatus()
102         self._status.set_storage_index(self._storage_index)
103         self._status.set_helper(False)
104         self._status.set_progress(0.0)
105         self._status.set_active(True)
106         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
107          offsets_tuple) = self.verinfo
108         self._status.set_size(datalength)
109         self._status.set_encoding(k, N)
110
111     def get_status(self):
112         return self._status
113
114     def log(self, *args, **kwargs):
115         if "parent" not in kwargs:
116             kwargs["parent"] = self._log_number
117         if "facility" not in kwargs:
118             kwargs["facility"] = "tahoe.mutable.retrieve"
119         return log.msg(*args, **kwargs)
120
121     def download(self):
122         self._done_deferred = defer.Deferred()
123         self._started = time.time()
124         self._status.set_status("Retrieving Shares")
125
126         # first, which servers can we use?
127         versionmap = self.servermap.make_versionmap()
128         shares = versionmap[self.verinfo]
129         # this sharemap is consumed as we decide to send requests
130         self.remaining_sharemap = DictOfSets()
131         for (shnum, peerid, timestamp) in shares:
132             self.remaining_sharemap.add(shnum, peerid)
133
134         self.shares = {} # maps shnum to validated blocks
135
136         # how many shares do we need?
137         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
138          offsets_tuple) = self.verinfo
139         assert len(self.remaining_sharemap) >= k
140         # we start with the lowest shnums we have available, since FEC is
141         # faster if we're using "primary shares"
142         self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k])
143         for shnum in self.active_shnums:
144             # we use an arbitrary peer who has the share. If shares are
145             # doubled up (more than one share per peer), we could make this
146             # run faster by spreading the load among multiple peers. But the
147             # algorithm to do that is more complicated than I want to write
148             # right now, and a well-provisioned grid shouldn't have multiple
149             # shares per peer.
150             peerid = list(self.remaining_sharemap[shnum])[0]
151             self.get_data(shnum, peerid)
152
153         # control flow beyond this point: state machine. Receiving responses
154         # from queries is the input. We might send out more queries, or we
155         # might produce a result.
156
157         return self._done_deferred
158
159     def get_data(self, shnum, peerid):
160         self.log(format="sending sh#%(shnum)d request to [%(peerid)s]",
161                  shnum=shnum,
162                  peerid=idlib.shortnodeid_b2a(peerid),
163                  level=log.NOISY)
164         ss = self.servermap.connections[peerid]
165         started = time.time()
166         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
167          offsets_tuple) = self.verinfo
168         offsets = dict(offsets_tuple)
169         # we read the checkstring, to make sure that the data we grab is from
170         # the right version. We also read the data, and the hashes necessary
171         # to validate them (share_hash_chain, block_hash_tree, share_data).
172         # We don't read the signature or the pubkey, since that was handled
173         # during the servermap phase, and we'll be comparing the share hash
174         # chain against the roothash that was validated back then.
175         readv = [ (0, struct.calcsize(SIGNED_PREFIX)),
176                   (offsets['share_hash_chain'],
177                    offsets['enc_privkey'] - offsets['share_hash_chain']),
178                   ]
179
180         m = Marker()
181         self._outstanding_queries[m] = (peerid, shnum, started)
182
183         # ask the cache first
184         got_from_cache = False
185         datavs = []
186         for (offset, length) in readv:
187             (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
188                                                        offset, length)
189             if data is not None:
190                 datavs.append(data)
191         if len(datavs) == len(readv):
192             self.log("got data from cache")
193             got_from_cache = True
194             d = fireEventually({shnum: datavs})
195             # datavs is a dict mapping shnum to a pair of strings
196         else:
197             d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
198         self.remaining_sharemap.discard(shnum, peerid)
199
200         d.addCallback(self._got_results, m, peerid, started, got_from_cache)
201         d.addErrback(self._query_failed, m, peerid)
202         # errors that aren't handled by _query_failed (and errors caused by
203         # _query_failed) get logged, but we still want to check for doneness.
204         def _oops(f):
205             self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s",
206                      shnum=shnum,
207                      peerid=idlib.shortnodeid_b2a(peerid),
208                      failure=f,
209                      level=log.WEIRD, umid="W0xnQA")
210         d.addErrback(_oops)
211         d.addBoth(self._check_for_done)
212         # any error during _check_for_done means the download fails. If the
213         # download is successful, _check_for_done will fire _done by itself.
214         d.addErrback(self._done)
215         d.addErrback(log.err)
216         return d # purely for testing convenience
217
218     def _do_read(self, ss, peerid, storage_index, shnums, readv):
219         # isolate the callRemote to a separate method, so tests can subclass
220         # Publish and override it
221         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
222         return d
223
224     def remove_peer(self, peerid):
225         for shnum in list(self.remaining_sharemap.keys()):
226             self.remaining_sharemap.discard(shnum, peerid)
227
228     def _got_results(self, datavs, marker, peerid, started, got_from_cache):
229         now = time.time()
230         elapsed = now - started
231         if not got_from_cache:
232             self._status.add_fetch_timing(peerid, elapsed)
233         self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
234                  shares=len(datavs),
235                  peerid=idlib.shortnodeid_b2a(peerid),
236                  level=log.NOISY)
237         self._outstanding_queries.pop(marker, None)
238         if not self._running:
239             return
240
241         # note that we only ask for a single share per query, so we only
242         # expect a single share back. On the other hand, we use the extra
243         # shares if we get them.. seems better than an assert().
244
245         for shnum,datav in datavs.items():
246             (prefix, hash_and_data) = datav
247             try:
248                 self._got_results_one_share(shnum, peerid,
249                                             prefix, hash_and_data)
250             except CorruptShareError, e:
251                 # log it and give the other shares a chance to be processed
252                 f = failure.Failure()
253                 self.log(format="bad share: %(f_value)s",
254                          f_value=str(f.value), failure=f,
255                          level=log.WEIRD, umid="7fzWZw")
256                 self.remove_peer(peerid)
257                 self.servermap.mark_bad_share(peerid, shnum, prefix)
258                 self._bad_shares.add( (peerid, shnum) )
259                 self._status.problems[peerid] = f
260                 self._last_failure = f
261                 pass
262         # all done!
263
264     def _got_results_one_share(self, shnum, peerid,
265                                got_prefix, got_hash_and_data):
266         self.log("_got_results: got shnum #%d from peerid %s"
267                  % (shnum, idlib.shortnodeid_b2a(peerid)))
268         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
269          offsets_tuple) = self.verinfo
270         assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix))
271         if got_prefix != prefix:
272             msg = "someone wrote to the data since we read the servermap: prefix changed"
273             raise UncoordinatedWriteError(msg)
274         (share_hash_chain, block_hash_tree,
275          share_data) = unpack_share_data(self.verinfo, got_hash_and_data)
276
277         assert isinstance(share_data, str)
278         # build the block hash tree. SDMF has only one leaf.
279         leaves = [hashutil.block_hash(share_data)]
280         t = hashtree.HashTree(leaves)
281         if list(t) != block_hash_tree:
282             raise CorruptShareError(peerid, shnum, "block hash tree failure")
283         share_hash_leaf = t[0]
284         t2 = hashtree.IncompleteHashTree(N)
285         # root_hash was checked by the signature
286         t2.set_hashes({0: root_hash})
287         try:
288             t2.set_hashes(hashes=share_hash_chain,
289                           leaves={shnum: share_hash_leaf})
290         except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
291                 IndexError), e:
292             msg = "corrupt hashes: %s" % (e,)
293             raise CorruptShareError(peerid, shnum, msg)
294         self.log(" data valid! len=%d" % len(share_data))
295         # each query comes down to this: placing validated share data into
296         # self.shares
297         self.shares[shnum] = share_data
298
299     def _query_failed(self, f, marker, peerid):
300         self.log(format="query to [%(peerid)s] failed",
301                  peerid=idlib.shortnodeid_b2a(peerid),
302                  level=log.NOISY)
303         self._status.problems[peerid] = f
304         self._outstanding_queries.pop(marker, None)
305         if not self._running:
306             return
307         self._last_failure = f
308         self.remove_peer(peerid)
309         level = log.WEIRD
310         if f.check(DeadReferenceError):
311             level = log.UNUSUAL
312         self.log(format="error during query: %(f_value)s",
313                  f_value=str(f.value), failure=f, level=level, umid="gOJB5g")
314
315     def _check_for_done(self, res):
316         # exit paths:
317         #  return : keep waiting, no new queries
318         #  return self._send_more_queries(outstanding) : send some more queries
319         #  fire self._done(plaintext) : download successful
320         #  raise exception : download fails
321
322         self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s",
323                  running=self._running, decoding=self._decoding,
324                  level=log.NOISY)
325         if not self._running:
326             return
327         if self._decoding:
328             return
329         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
330          offsets_tuple) = self.verinfo
331
332         if len(self.shares) < k:
333             # we don't have enough shares yet
334             return self._maybe_send_more_queries(k)
335
336         # we have enough to finish. All the shares have had their hashes
337         # checked, so if something fails at this point, we don't know how
338         # to fix it, so the download will fail.
339
340         self._decoding = True # avoid reentrancy
341         self._status.set_status("decoding")
342         now = time.time()
343         elapsed = now - self._started
344         self._status.timings["fetch"] = elapsed
345
346         d = defer.maybeDeferred(self._decode)
347         d.addCallback(self._decrypt, IV, self._node._readkey)
348         d.addBoth(self._done)
349         return d # purely for test convenience
350
351     def _maybe_send_more_queries(self, k):
352         # we don't have enough shares yet. Should we send out more queries?
353         # There are some number of queries outstanding, each for a single
354         # share. If we can generate 'needed_shares' additional queries, we do
355         # so. If we can't, then we know this file is a goner, and we raise
356         # NotEnoughSharesError.
357         self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, "
358                          "outstanding=%(outstanding)d"),
359                  have=len(self.shares), k=k,
360                  outstanding=len(self._outstanding_queries),
361                  level=log.NOISY)
362
363         remaining_shares = k - len(self.shares)
364         needed = remaining_shares - len(self._outstanding_queries)
365         if not needed:
366             # we have enough queries in flight already
367
368             # TODO: but if they've been in flight for a long time, and we
369             # have reason to believe that new queries might respond faster
370             # (i.e. we've seen other queries come back faster, then consider
371             # sending out new queries. This could help with peers which have
372             # silently gone away since the servermap was updated, for which
373             # we're still waiting for the 15-minute TCP disconnect to happen.
374             self.log("enough queries are in flight, no more are needed",
375                      level=log.NOISY)
376             return
377
378         outstanding_shnums = set([shnum
379                                   for (peerid, shnum, started)
380                                   in self._outstanding_queries.values()])
381         # prefer low-numbered shares, they are more likely to be primary
382         available_shnums = sorted(self.remaining_sharemap.keys())
383         for shnum in available_shnums:
384             if shnum in outstanding_shnums:
385                 # skip ones that are already in transit
386                 continue
387             if shnum not in self.remaining_sharemap:
388                 # no servers for that shnum. note that DictOfSets removes
389                 # empty sets from the dict for us.
390                 continue
391             peerid = list(self.remaining_sharemap[shnum])[0]
392             # get_data will remove that peerid from the sharemap, and add the
393             # query to self._outstanding_queries
394             self._status.set_status("Retrieving More Shares")
395             self.get_data(shnum, peerid)
396             needed -= 1
397             if not needed:
398                 break
399
400         # at this point, we have as many outstanding queries as we can. If
401         # needed!=0 then we might not have enough to recover the file.
402         if needed:
403             format = ("ran out of peers: "
404                       "have %(have)d shares (k=%(k)d), "
405                       "%(outstanding)d queries in flight, "
406                       "need %(need)d more, "
407                       "found %(bad)d bad shares")
408             args = {"have": len(self.shares),
409                     "k": k,
410                     "outstanding": len(self._outstanding_queries),
411                     "need": needed,
412                     "bad": len(self._bad_shares),
413                     }
414             self.log(format=format,
415                      level=log.WEIRD, umid="ezTfjw", **args)
416             err = NotEnoughSharesError("%s, last failure: %s" %
417                                       (format % args, self._last_failure))
418             if self._bad_shares:
419                 self.log("We found some bad shares this pass. You should "
420                          "update the servermap and try again to check "
421                          "more peers",
422                          level=log.WEIRD, umid="EFkOlA")
423                 err.servermap = self.servermap
424             raise err
425
426         return
427
428     def _decode(self):
429         started = time.time()
430         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
431          offsets_tuple) = self.verinfo
432
433         # shares_dict is a dict mapping shnum to share data, but the codec
434         # wants two lists.
435         shareids = []; shares = []
436         for shareid, share in self.shares.items():
437             shareids.append(shareid)
438             shares.append(share)
439
440         assert len(shareids) >= k, len(shareids)
441         # zfec really doesn't want extra shares
442         shareids = shareids[:k]
443         shares = shares[:k]
444
445         fec = codec.CRSDecoder()
446         params = "%d-%d-%d" % (segsize, k, N)
447         fec.set_serialized_params(params)
448
449         self.log("params %s, we have %d shares" % (params, len(shares)))
450         self.log("about to decode, shareids=%s" % (shareids,))
451         d = defer.maybeDeferred(fec.decode, shares, shareids)
452         def _done(buffers):
453             self._status.timings["decode"] = time.time() - started
454             self.log(" decode done, %d buffers" % len(buffers))
455             segment = "".join(buffers)
456             self.log(" joined length %d, datalength %d" %
457                      (len(segment), datalength))
458             segment = segment[:datalength]
459             self.log(" segment len=%d" % len(segment))
460             return segment
461         def _err(f):
462             self.log(" decode failed: %s" % f)
463             return f
464         d.addCallback(_done)
465         d.addErrback(_err)
466         return d
467
468     def _decrypt(self, crypttext, IV, readkey):
469         self._status.set_status("decrypting")
470         started = time.time()
471         key = hashutil.ssk_readkey_data_hash(IV, readkey)
472         decryptor = AES(key)
473         plaintext = decryptor.process(crypttext)
474         self._status.timings["decrypt"] = time.time() - started
475         return plaintext
476
477     def _done(self, res):
478         if not self._running:
479             return
480         self._running = False
481         self._status.set_active(False)
482         self._status.timings["total"] = time.time() - self._started
483         # res is either the new contents, or a Failure
484         if isinstance(res, failure.Failure):
485             self.log("Retrieve done, with failure", failure=res,
486                      level=log.UNUSUAL)
487             self._status.set_status("Failed")
488         else:
489             self.log("Retrieve done, success!")
490             self._status.set_status("Done")
491             self._status.set_progress(1.0)
492             # remember the encoding parameters, use them again next time
493             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
494              offsets_tuple) = self.verinfo
495             self._node._populate_required_shares(k)
496             self._node._populate_total_shares(N)
497         eventually(self._done_deferred.callback, res)
498