3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap import DeadReferenceError
8 from foolscap.eventual import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus
10 from allmydata.util import hashutil, idlib, log
11 from allmydata import hashtree, codec, storage
12 from allmydata.immutable.encode import NotEnoughSharesError
13 from pycryptopp.cipher.aes import AES
15 from common import DictOfSets, CorruptShareError, UncoordinatedWriteError
16 from layout import SIGNED_PREFIX, unpack_share_data
19 implements(IRetrieveStatus)
20 statusid_counter = count(0)
23 self.timings["fetch_per_server"] = {}
24 self.timings["cumulative_verify"] = 0.0
27 self.storage_index = None
29 self.encoding = ("?","?")
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
36 def get_started(self):
38 def get_storage_index(self):
39 return self.storage_index
40 def get_encoding(self):
42 def using_helper(self):
48 def get_progress(self):
52 def get_counter(self):
55 def add_fetch_timing(self, peerid, elapsed):
56 if peerid not in self.timings["fetch_per_server"]:
57 self.timings["fetch_per_server"][peerid] = []
58 self.timings["fetch_per_server"][peerid].append(elapsed)
59 def set_storage_index(self, si):
60 self.storage_index = si
61 def set_helper(self, helper):
63 def set_encoding(self, k, n):
64 self.encoding = (k, n)
65 def set_size(self, size):
67 def set_status(self, status):
69 def set_progress(self, value):
71 def set_active(self, value):
78 # this class is currently single-use. Eventually (in MDMF) we will make
79 # it multi-use, in which case you can call download(range) multiple
80 # times, and each will have a separate response chain. However the
81 # Retrieve object will remain tied to a specific version of the file, and
82 # will use a single ServerMap instance.
84 def __init__(self, filenode, servermap, verinfo):
86 assert self._node._pubkey
87 self._storage_index = filenode.get_storage_index()
88 assert self._node._readkey
89 self._last_failure = None
90 prefix = storage.si_b2a(self._storage_index)[:5]
91 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
92 self._outstanding_queries = {} # maps (peerid,shnum) to start_time
94 self._decoding = False
95 self._bad_shares = set()
97 self.servermap = servermap
98 assert self._node._pubkey
99 self.verinfo = verinfo
101 self._status = RetrieveStatus()
102 self._status.set_storage_index(self._storage_index)
103 self._status.set_helper(False)
104 self._status.set_progress(0.0)
105 self._status.set_active(True)
106 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
107 offsets_tuple) = self.verinfo
108 self._status.set_size(datalength)
109 self._status.set_encoding(k, N)
111 def get_status(self):
114 def log(self, *args, **kwargs):
115 if "parent" not in kwargs:
116 kwargs["parent"] = self._log_number
117 if "facility" not in kwargs:
118 kwargs["facility"] = "tahoe.mutable.retrieve"
119 return log.msg(*args, **kwargs)
122 self._done_deferred = defer.Deferred()
123 self._started = time.time()
124 self._status.set_status("Retrieving Shares")
126 # first, which servers can we use?
127 versionmap = self.servermap.make_versionmap()
128 shares = versionmap[self.verinfo]
129 # this sharemap is consumed as we decide to send requests
130 self.remaining_sharemap = DictOfSets()
131 for (shnum, peerid, timestamp) in shares:
132 self.remaining_sharemap.add(shnum, peerid)
134 self.shares = {} # maps shnum to validated blocks
136 # how many shares do we need?
137 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
138 offsets_tuple) = self.verinfo
139 assert len(self.remaining_sharemap) >= k
140 # we start with the lowest shnums we have available, since FEC is
141 # faster if we're using "primary shares"
142 self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k])
143 for shnum in self.active_shnums:
144 # we use an arbitrary peer who has the share. If shares are
145 # doubled up (more than one share per peer), we could make this
146 # run faster by spreading the load among multiple peers. But the
147 # algorithm to do that is more complicated than I want to write
148 # right now, and a well-provisioned grid shouldn't have multiple
150 peerid = list(self.remaining_sharemap[shnum])[0]
151 self.get_data(shnum, peerid)
153 # control flow beyond this point: state machine. Receiving responses
154 # from queries is the input. We might send out more queries, or we
155 # might produce a result.
157 return self._done_deferred
159 def get_data(self, shnum, peerid):
160 self.log(format="sending sh#%(shnum)d request to [%(peerid)s]",
162 peerid=idlib.shortnodeid_b2a(peerid),
164 ss = self.servermap.connections[peerid]
165 started = time.time()
166 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
167 offsets_tuple) = self.verinfo
168 offsets = dict(offsets_tuple)
169 # we read the checkstring, to make sure that the data we grab is from
170 # the right version. We also read the data, and the hashes necessary
171 # to validate them (share_hash_chain, block_hash_tree, share_data).
172 # We don't read the signature or the pubkey, since that was handled
173 # during the servermap phase, and we'll be comparing the share hash
174 # chain against the roothash that was validated back then.
175 readv = [ (0, struct.calcsize(SIGNED_PREFIX)),
176 (offsets['share_hash_chain'],
177 offsets['enc_privkey'] - offsets['share_hash_chain']),
181 self._outstanding_queries[m] = (peerid, shnum, started)
183 # ask the cache first
184 got_from_cache = False
186 for (offset, length) in readv:
187 (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
191 if len(datavs) == len(readv):
192 self.log("got data from cache")
193 got_from_cache = True
194 d = fireEventually({shnum: datavs})
195 # datavs is a dict mapping shnum to a pair of strings
197 d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
198 self.remaining_sharemap.discard(shnum, peerid)
200 d.addCallback(self._got_results, m, peerid, started, got_from_cache)
201 d.addErrback(self._query_failed, m, peerid)
202 # errors that aren't handled by _query_failed (and errors caused by
203 # _query_failed) get logged, but we still want to check for doneness.
205 self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s",
207 peerid=idlib.shortnodeid_b2a(peerid),
209 level=log.WEIRD, umid="W0xnQA")
211 d.addBoth(self._check_for_done)
212 # any error during _check_for_done means the download fails. If the
213 # download is successful, _check_for_done will fire _done by itself.
214 d.addErrback(self._done)
215 d.addErrback(log.err)
216 return d # purely for testing convenience
218 def _do_read(self, ss, peerid, storage_index, shnums, readv):
219 # isolate the callRemote to a separate method, so tests can subclass
220 # Publish and override it
221 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
224 def remove_peer(self, peerid):
225 for shnum in list(self.remaining_sharemap.keys()):
226 self.remaining_sharemap.discard(shnum, peerid)
228 def _got_results(self, datavs, marker, peerid, started, got_from_cache):
230 elapsed = now - started
231 if not got_from_cache:
232 self._status.add_fetch_timing(peerid, elapsed)
233 self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
235 peerid=idlib.shortnodeid_b2a(peerid),
237 self._outstanding_queries.pop(marker, None)
238 if not self._running:
241 # note that we only ask for a single share per query, so we only
242 # expect a single share back. On the other hand, we use the extra
243 # shares if we get them.. seems better than an assert().
245 for shnum,datav in datavs.items():
246 (prefix, hash_and_data) = datav
248 self._got_results_one_share(shnum, peerid,
249 prefix, hash_and_data)
250 except CorruptShareError, e:
251 # log it and give the other shares a chance to be processed
252 f = failure.Failure()
253 self.log(format="bad share: %(f_value)s",
254 f_value=str(f.value), failure=f,
255 level=log.WEIRD, umid="7fzWZw")
256 self.remove_peer(peerid)
257 self.servermap.mark_bad_share(peerid, shnum, prefix)
258 self._bad_shares.add( (peerid, shnum) )
259 self._status.problems[peerid] = f
260 self._last_failure = f
264 def _got_results_one_share(self, shnum, peerid,
265 got_prefix, got_hash_and_data):
266 self.log("_got_results: got shnum #%d from peerid %s"
267 % (shnum, idlib.shortnodeid_b2a(peerid)))
268 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
269 offsets_tuple) = self.verinfo
270 assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix))
271 if got_prefix != prefix:
272 msg = "someone wrote to the data since we read the servermap: prefix changed"
273 raise UncoordinatedWriteError(msg)
274 (share_hash_chain, block_hash_tree,
275 share_data) = unpack_share_data(self.verinfo, got_hash_and_data)
277 assert isinstance(share_data, str)
278 # build the block hash tree. SDMF has only one leaf.
279 leaves = [hashutil.block_hash(share_data)]
280 t = hashtree.HashTree(leaves)
281 if list(t) != block_hash_tree:
282 raise CorruptShareError(peerid, shnum, "block hash tree failure")
283 share_hash_leaf = t[0]
284 t2 = hashtree.IncompleteHashTree(N)
285 # root_hash was checked by the signature
286 t2.set_hashes({0: root_hash})
288 t2.set_hashes(hashes=share_hash_chain,
289 leaves={shnum: share_hash_leaf})
290 except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
292 msg = "corrupt hashes: %s" % (e,)
293 raise CorruptShareError(peerid, shnum, msg)
294 self.log(" data valid! len=%d" % len(share_data))
295 # each query comes down to this: placing validated share data into
297 self.shares[shnum] = share_data
299 def _query_failed(self, f, marker, peerid):
300 self.log(format="query to [%(peerid)s] failed",
301 peerid=idlib.shortnodeid_b2a(peerid),
303 self._status.problems[peerid] = f
304 self._outstanding_queries.pop(marker, None)
305 if not self._running:
307 self._last_failure = f
308 self.remove_peer(peerid)
310 if f.check(DeadReferenceError):
312 self.log(format="error during query: %(f_value)s",
313 f_value=str(f.value), failure=f, level=level, umid="gOJB5g")
315 def _check_for_done(self, res):
317 # return : keep waiting, no new queries
318 # return self._send_more_queries(outstanding) : send some more queries
319 # fire self._done(plaintext) : download successful
320 # raise exception : download fails
322 self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s",
323 running=self._running, decoding=self._decoding,
325 if not self._running:
329 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
330 offsets_tuple) = self.verinfo
332 if len(self.shares) < k:
333 # we don't have enough shares yet
334 return self._maybe_send_more_queries(k)
336 # we have enough to finish. All the shares have had their hashes
337 # checked, so if something fails at this point, we don't know how
338 # to fix it, so the download will fail.
340 self._decoding = True # avoid reentrancy
341 self._status.set_status("decoding")
343 elapsed = now - self._started
344 self._status.timings["fetch"] = elapsed
346 d = defer.maybeDeferred(self._decode)
347 d.addCallback(self._decrypt, IV, self._node._readkey)
348 d.addBoth(self._done)
349 return d # purely for test convenience
351 def _maybe_send_more_queries(self, k):
352 # we don't have enough shares yet. Should we send out more queries?
353 # There are some number of queries outstanding, each for a single
354 # share. If we can generate 'needed_shares' additional queries, we do
355 # so. If we can't, then we know this file is a goner, and we raise
356 # NotEnoughSharesError.
357 self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, "
358 "outstanding=%(outstanding)d"),
359 have=len(self.shares), k=k,
360 outstanding=len(self._outstanding_queries),
363 remaining_shares = k - len(self.shares)
364 needed = remaining_shares - len(self._outstanding_queries)
366 # we have enough queries in flight already
368 # TODO: but if they've been in flight for a long time, and we
369 # have reason to believe that new queries might respond faster
370 # (i.e. we've seen other queries come back faster, then consider
371 # sending out new queries. This could help with peers which have
372 # silently gone away since the servermap was updated, for which
373 # we're still waiting for the 15-minute TCP disconnect to happen.
374 self.log("enough queries are in flight, no more are needed",
378 outstanding_shnums = set([shnum
379 for (peerid, shnum, started)
380 in self._outstanding_queries.values()])
381 # prefer low-numbered shares, they are more likely to be primary
382 available_shnums = sorted(self.remaining_sharemap.keys())
383 for shnum in available_shnums:
384 if shnum in outstanding_shnums:
385 # skip ones that are already in transit
387 if shnum not in self.remaining_sharemap:
388 # no servers for that shnum. note that DictOfSets removes
389 # empty sets from the dict for us.
391 peerid = list(self.remaining_sharemap[shnum])[0]
392 # get_data will remove that peerid from the sharemap, and add the
393 # query to self._outstanding_queries
394 self._status.set_status("Retrieving More Shares")
395 self.get_data(shnum, peerid)
400 # at this point, we have as many outstanding queries as we can. If
401 # needed!=0 then we might not have enough to recover the file.
403 format = ("ran out of peers: "
404 "have %(have)d shares (k=%(k)d), "
405 "%(outstanding)d queries in flight, "
406 "need %(need)d more, "
407 "found %(bad)d bad shares")
408 args = {"have": len(self.shares),
410 "outstanding": len(self._outstanding_queries),
412 "bad": len(self._bad_shares),
414 self.log(format=format,
415 level=log.WEIRD, umid="ezTfjw", **args)
416 err = NotEnoughSharesError("%s, last failure: %s" %
417 (format % args, self._last_failure))
419 self.log("We found some bad shares this pass. You should "
420 "update the servermap and try again to check "
422 level=log.WEIRD, umid="EFkOlA")
423 err.servermap = self.servermap
429 started = time.time()
430 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
431 offsets_tuple) = self.verinfo
433 # shares_dict is a dict mapping shnum to share data, but the codec
435 shareids = []; shares = []
436 for shareid, share in self.shares.items():
437 shareids.append(shareid)
440 assert len(shareids) >= k, len(shareids)
441 # zfec really doesn't want extra shares
442 shareids = shareids[:k]
445 fec = codec.CRSDecoder()
446 params = "%d-%d-%d" % (segsize, k, N)
447 fec.set_serialized_params(params)
449 self.log("params %s, we have %d shares" % (params, len(shares)))
450 self.log("about to decode, shareids=%s" % (shareids,))
451 d = defer.maybeDeferred(fec.decode, shares, shareids)
453 self._status.timings["decode"] = time.time() - started
454 self.log(" decode done, %d buffers" % len(buffers))
455 segment = "".join(buffers)
456 self.log(" joined length %d, datalength %d" %
457 (len(segment), datalength))
458 segment = segment[:datalength]
459 self.log(" segment len=%d" % len(segment))
462 self.log(" decode failed: %s" % f)
468 def _decrypt(self, crypttext, IV, readkey):
469 self._status.set_status("decrypting")
470 started = time.time()
471 key = hashutil.ssk_readkey_data_hash(IV, readkey)
473 plaintext = decryptor.process(crypttext)
474 self._status.timings["decrypt"] = time.time() - started
477 def _done(self, res):
478 if not self._running:
480 self._running = False
481 self._status.set_active(False)
482 self._status.timings["total"] = time.time() - self._started
483 # res is either the new contents, or a Failure
484 if isinstance(res, failure.Failure):
485 self.log("Retrieve done, with failure", failure=res,
487 self._status.set_status("Failed")
489 self.log("Retrieve done, success!")
490 self._status.set_status("Done")
491 self._status.set_progress(1.0)
492 # remember the encoding parameters, use them again next time
493 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
494 offsets_tuple) = self.verinfo
495 self._node._populate_required_shares(k)
496 self._node._populate_total_shares(N)
497 eventually(self._done_deferred.callback, res)