3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, eventually, fireEventually
8 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError
9 from allmydata.util import hashutil, idlib, log
10 from allmydata import hashtree, codec
11 from allmydata.storage.server import si_b2a
12 from pycryptopp.cipher.aes import AES
13 from pycryptopp.publickey import rsa
15 from common import DictOfSets, CorruptShareError, UncoordinatedWriteError
16 from layout import SIGNED_PREFIX, unpack_share_data
19 implements(IRetrieveStatus)
20 statusid_counter = count(0)
23 self.timings["fetch_per_server"] = {}
24 self.timings["cumulative_verify"] = 0.0
27 self.storage_index = None
29 self.encoding = ("?","?")
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
36 def get_started(self):
38 def get_storage_index(self):
39 return self.storage_index
40 def get_encoding(self):
42 def using_helper(self):
48 def get_progress(self):
52 def get_counter(self):
55 def add_fetch_timing(self, peerid, elapsed):
56 if peerid not in self.timings["fetch_per_server"]:
57 self.timings["fetch_per_server"][peerid] = []
58 self.timings["fetch_per_server"][peerid].append(elapsed)
59 def set_storage_index(self, si):
60 self.storage_index = si
61 def set_helper(self, helper):
63 def set_encoding(self, k, n):
64 self.encoding = (k, n)
65 def set_size(self, size):
67 def set_status(self, status):
69 def set_progress(self, value):
71 def set_active(self, value):
78 # this class is currently single-use. Eventually (in MDMF) we will make
79 # it multi-use, in which case you can call download(range) multiple
80 # times, and each will have a separate response chain. However the
81 # Retrieve object will remain tied to a specific version of the file, and
82 # will use a single ServerMap instance.
84 def __init__(self, filenode, servermap, verinfo, fetch_privkey=False):
86 assert self._node._pubkey
87 self._storage_index = filenode.get_storage_index()
88 assert self._node._readkey
89 self._last_failure = None
90 prefix = si_b2a(self._storage_index)[:5]
91 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
92 self._outstanding_queries = {} # maps (peerid,shnum) to start_time
94 self._decoding = False
95 self._bad_shares = set()
97 self.servermap = servermap
98 assert self._node._pubkey
99 self.verinfo = verinfo
100 # during repair, we may be called upon to grab the private key, since
101 # it wasn't picked up during a verify=False checker run, and we'll
102 # need it for repair to generate the a new version.
103 self._need_privkey = fetch_privkey
104 if self._node._privkey:
105 self._need_privkey = False
107 self._status = RetrieveStatus()
108 self._status.set_storage_index(self._storage_index)
109 self._status.set_helper(False)
110 self._status.set_progress(0.0)
111 self._status.set_active(True)
112 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
113 offsets_tuple) = self.verinfo
114 self._status.set_size(datalength)
115 self._status.set_encoding(k, N)
117 def get_status(self):
120 def log(self, *args, **kwargs):
121 if "parent" not in kwargs:
122 kwargs["parent"] = self._log_number
123 if "facility" not in kwargs:
124 kwargs["facility"] = "tahoe.mutable.retrieve"
125 return log.msg(*args, **kwargs)
128 self._done_deferred = defer.Deferred()
129 self._started = time.time()
130 self._status.set_status("Retrieving Shares")
132 # first, which servers can we use?
133 versionmap = self.servermap.make_versionmap()
134 shares = versionmap[self.verinfo]
135 # this sharemap is consumed as we decide to send requests
136 self.remaining_sharemap = DictOfSets()
137 for (shnum, peerid, timestamp) in shares:
138 self.remaining_sharemap.add(shnum, peerid)
140 self.shares = {} # maps shnum to validated blocks
142 # how many shares do we need?
143 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
144 offsets_tuple) = self.verinfo
145 assert len(self.remaining_sharemap) >= k
146 # we start with the lowest shnums we have available, since FEC is
147 # faster if we're using "primary shares"
148 self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k])
149 for shnum in self.active_shnums:
150 # we use an arbitrary peer who has the share. If shares are
151 # doubled up (more than one share per peer), we could make this
152 # run faster by spreading the load among multiple peers. But the
153 # algorithm to do that is more complicated than I want to write
154 # right now, and a well-provisioned grid shouldn't have multiple
156 peerid = list(self.remaining_sharemap[shnum])[0]
157 self.get_data(shnum, peerid)
159 # control flow beyond this point: state machine. Receiving responses
160 # from queries is the input. We might send out more queries, or we
161 # might produce a result.
163 return self._done_deferred
165 def get_data(self, shnum, peerid):
166 self.log(format="sending sh#%(shnum)d request to [%(peerid)s]",
168 peerid=idlib.shortnodeid_b2a(peerid),
170 ss = self.servermap.connections[peerid]
171 started = time.time()
172 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
173 offsets_tuple) = self.verinfo
174 offsets = dict(offsets_tuple)
176 # we read the checkstring, to make sure that the data we grab is from
178 readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ]
180 # We also read the data, and the hashes necessary to validate them
181 # (share_hash_chain, block_hash_tree, share_data). We don't read the
182 # signature or the pubkey, since that was handled during the
183 # servermap phase, and we'll be comparing the share hash chain
184 # against the roothash that was validated back then.
186 readv.append( (offsets['share_hash_chain'],
187 offsets['enc_privkey'] - offsets['share_hash_chain'] ) )
189 # if we need the private key (for repair), we also fetch that
190 if self._need_privkey:
191 readv.append( (offsets['enc_privkey'],
192 offsets['EOF'] - offsets['enc_privkey']) )
195 self._outstanding_queries[m] = (peerid, shnum, started)
197 # ask the cache first
198 got_from_cache = False
200 for (offset, length) in readv:
201 (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
205 if len(datavs) == len(readv):
206 self.log("got data from cache")
207 got_from_cache = True
208 d = fireEventually({shnum: datavs})
209 # datavs is a dict mapping shnum to a pair of strings
211 d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
212 self.remaining_sharemap.discard(shnum, peerid)
214 d.addCallback(self._got_results, m, peerid, started, got_from_cache)
215 d.addErrback(self._query_failed, m, peerid)
216 # errors that aren't handled by _query_failed (and errors caused by
217 # _query_failed) get logged, but we still want to check for doneness.
219 self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s",
221 peerid=idlib.shortnodeid_b2a(peerid),
223 level=log.WEIRD, umid="W0xnQA")
225 d.addBoth(self._check_for_done)
226 # any error during _check_for_done means the download fails. If the
227 # download is successful, _check_for_done will fire _done by itself.
228 d.addErrback(self._done)
229 d.addErrback(log.err)
230 return d # purely for testing convenience
232 def _do_read(self, ss, peerid, storage_index, shnums, readv):
233 # isolate the callRemote to a separate method, so tests can subclass
234 # Publish and override it
235 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
238 def remove_peer(self, peerid):
239 for shnum in list(self.remaining_sharemap.keys()):
240 self.remaining_sharemap.discard(shnum, peerid)
242 def _got_results(self, datavs, marker, peerid, started, got_from_cache):
244 elapsed = now - started
245 if not got_from_cache:
246 self._status.add_fetch_timing(peerid, elapsed)
247 self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
249 peerid=idlib.shortnodeid_b2a(peerid),
251 self._outstanding_queries.pop(marker, None)
252 if not self._running:
255 # note that we only ask for a single share per query, so we only
256 # expect a single share back. On the other hand, we use the extra
257 # shares if we get them.. seems better than an assert().
259 for shnum,datav in datavs.items():
260 (prefix, hash_and_data) = datav[:2]
262 self._got_results_one_share(shnum, peerid,
263 prefix, hash_and_data)
264 except CorruptShareError, e:
265 # log it and give the other shares a chance to be processed
266 f = failure.Failure()
267 self.log(format="bad share: %(f_value)s",
268 f_value=str(f.value), failure=f,
269 level=log.WEIRD, umid="7fzWZw")
270 self.notify_server_corruption(peerid, shnum, str(e))
271 self.remove_peer(peerid)
272 self.servermap.mark_bad_share(peerid, shnum, prefix)
273 self._bad_shares.add( (peerid, shnum) )
274 self._status.problems[peerid] = f
275 self._last_failure = f
277 if self._need_privkey and len(datav) > 2:
279 self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
282 def notify_server_corruption(self, peerid, shnum, reason):
283 ss = self.servermap.connections[peerid]
284 ss.callRemoteOnly("advise_corrupt_share",
285 "mutable", self._storage_index, shnum, reason)
287 def _got_results_one_share(self, shnum, peerid,
288 got_prefix, got_hash_and_data):
289 self.log("_got_results: got shnum #%d from peerid %s"
290 % (shnum, idlib.shortnodeid_b2a(peerid)))
291 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
292 offsets_tuple) = self.verinfo
293 assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix))
294 if got_prefix != prefix:
295 msg = "someone wrote to the data since we read the servermap: prefix changed"
296 raise UncoordinatedWriteError(msg)
297 (share_hash_chain, block_hash_tree,
298 share_data) = unpack_share_data(self.verinfo, got_hash_and_data)
300 assert isinstance(share_data, str)
301 # build the block hash tree. SDMF has only one leaf.
302 leaves = [hashutil.block_hash(share_data)]
303 t = hashtree.HashTree(leaves)
304 if list(t) != block_hash_tree:
305 raise CorruptShareError(peerid, shnum, "block hash tree failure")
306 share_hash_leaf = t[0]
307 t2 = hashtree.IncompleteHashTree(N)
308 # root_hash was checked by the signature
309 t2.set_hashes({0: root_hash})
311 t2.set_hashes(hashes=share_hash_chain,
312 leaves={shnum: share_hash_leaf})
313 except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
315 msg = "corrupt hashes: %s" % (e,)
316 raise CorruptShareError(peerid, shnum, msg)
317 self.log(" data valid! len=%d" % len(share_data))
318 # each query comes down to this: placing validated share data into
320 self.shares[shnum] = share_data
322 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
324 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
325 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
326 if alleged_writekey != self._node.get_writekey():
327 self.log("invalid privkey from %s shnum %d" %
328 (idlib.nodeid_b2a(peerid)[:8], shnum),
329 parent=lp, level=log.WEIRD, umid="YIw4tA")
333 self.log("got valid privkey from shnum %d on peerid %s" %
334 (shnum, idlib.shortnodeid_b2a(peerid)),
336 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
337 self._node._populate_encprivkey(enc_privkey)
338 self._node._populate_privkey(privkey)
339 self._need_privkey = False
341 def _query_failed(self, f, marker, peerid):
342 self.log(format="query to [%(peerid)s] failed",
343 peerid=idlib.shortnodeid_b2a(peerid),
345 self._status.problems[peerid] = f
346 self._outstanding_queries.pop(marker, None)
347 if not self._running:
349 self._last_failure = f
350 self.remove_peer(peerid)
352 if f.check(DeadReferenceError):
354 self.log(format="error during query: %(f_value)s",
355 f_value=str(f.value), failure=f, level=level, umid="gOJB5g")
357 def _check_for_done(self, res):
359 # return : keep waiting, no new queries
360 # return self._send_more_queries(outstanding) : send some more queries
361 # fire self._done(plaintext) : download successful
362 # raise exception : download fails
364 self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s",
365 running=self._running, decoding=self._decoding,
367 if not self._running:
371 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
372 offsets_tuple) = self.verinfo
374 if len(self.shares) < k:
375 # we don't have enough shares yet
376 return self._maybe_send_more_queries(k)
377 if self._need_privkey:
378 # we got k shares, but none of them had a valid privkey. TODO:
379 # look further. Adding code to do this is a bit complicated, and
380 # I want to avoid that complication, and this should be pretty
381 # rare (k shares with bitflips in the enc_privkey but not in the
382 # data blocks). If we actually do get here, the subsequent repair
383 # will fail for lack of a privkey.
384 self.log("got k shares but still need_privkey, bummer",
385 level=log.WEIRD, umid="MdRHPA")
387 # we have enough to finish. All the shares have had their hashes
388 # checked, so if something fails at this point, we don't know how
389 # to fix it, so the download will fail.
391 self._decoding = True # avoid reentrancy
392 self._status.set_status("decoding")
394 elapsed = now - self._started
395 self._status.timings["fetch"] = elapsed
397 d = defer.maybeDeferred(self._decode)
398 d.addCallback(self._decrypt, IV, self._node._readkey)
399 d.addBoth(self._done)
400 return d # purely for test convenience
402 def _maybe_send_more_queries(self, k):
403 # we don't have enough shares yet. Should we send out more queries?
404 # There are some number of queries outstanding, each for a single
405 # share. If we can generate 'needed_shares' additional queries, we do
406 # so. If we can't, then we know this file is a goner, and we raise
407 # NotEnoughSharesError.
408 self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, "
409 "outstanding=%(outstanding)d"),
410 have=len(self.shares), k=k,
411 outstanding=len(self._outstanding_queries),
414 remaining_shares = k - len(self.shares)
415 needed = remaining_shares - len(self._outstanding_queries)
417 # we have enough queries in flight already
419 # TODO: but if they've been in flight for a long time, and we
420 # have reason to believe that new queries might respond faster
421 # (i.e. we've seen other queries come back faster, then consider
422 # sending out new queries. This could help with peers which have
423 # silently gone away since the servermap was updated, for which
424 # we're still waiting for the 15-minute TCP disconnect to happen.
425 self.log("enough queries are in flight, no more are needed",
429 outstanding_shnums = set([shnum
430 for (peerid, shnum, started)
431 in self._outstanding_queries.values()])
432 # prefer low-numbered shares, they are more likely to be primary
433 available_shnums = sorted(self.remaining_sharemap.keys())
434 for shnum in available_shnums:
435 if shnum in outstanding_shnums:
436 # skip ones that are already in transit
438 if shnum not in self.remaining_sharemap:
439 # no servers for that shnum. note that DictOfSets removes
440 # empty sets from the dict for us.
442 peerid = list(self.remaining_sharemap[shnum])[0]
443 # get_data will remove that peerid from the sharemap, and add the
444 # query to self._outstanding_queries
445 self._status.set_status("Retrieving More Shares")
446 self.get_data(shnum, peerid)
451 # at this point, we have as many outstanding queries as we can. If
452 # needed!=0 then we might not have enough to recover the file.
454 format = ("ran out of peers: "
455 "have %(have)d shares (k=%(k)d), "
456 "%(outstanding)d queries in flight, "
457 "need %(need)d more, "
458 "found %(bad)d bad shares")
459 args = {"have": len(self.shares),
461 "outstanding": len(self._outstanding_queries),
463 "bad": len(self._bad_shares),
465 self.log(format=format,
466 level=log.WEIRD, umid="ezTfjw", **args)
467 err = NotEnoughSharesError("%s, last failure: %s" %
468 (format % args, self._last_failure))
470 self.log("We found some bad shares this pass. You should "
471 "update the servermap and try again to check "
473 level=log.WEIRD, umid="EFkOlA")
474 err.servermap = self.servermap
480 started = time.time()
481 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
482 offsets_tuple) = self.verinfo
484 # shares_dict is a dict mapping shnum to share data, but the codec
486 shareids = []; shares = []
487 for shareid, share in self.shares.items():
488 shareids.append(shareid)
491 assert len(shareids) >= k, len(shareids)
492 # zfec really doesn't want extra shares
493 shareids = shareids[:k]
496 fec = codec.CRSDecoder()
497 fec.set_params(segsize, k, N)
499 self.log("params %s, we have %d shares" % ((segsize, k, N), len(shares)))
500 self.log("about to decode, shareids=%s" % (shareids,))
501 d = defer.maybeDeferred(fec.decode, shares, shareids)
503 self._status.timings["decode"] = time.time() - started
504 self.log(" decode done, %d buffers" % len(buffers))
505 segment = "".join(buffers)
506 self.log(" joined length %d, datalength %d" %
507 (len(segment), datalength))
508 segment = segment[:datalength]
509 self.log(" segment len=%d" % len(segment))
512 self.log(" decode failed: %s" % f)
518 def _decrypt(self, crypttext, IV, readkey):
519 self._status.set_status("decrypting")
520 started = time.time()
521 key = hashutil.ssk_readkey_data_hash(IV, readkey)
523 plaintext = decryptor.process(crypttext)
524 self._status.timings["decrypt"] = time.time() - started
527 def _done(self, res):
528 if not self._running:
530 self._running = False
531 self._status.set_active(False)
532 self._status.timings["total"] = time.time() - self._started
533 # res is either the new contents, or a Failure
534 if isinstance(res, failure.Failure):
535 self.log("Retrieve done, with failure", failure=res,
537 self._status.set_status("Failed")
539 self.log("Retrieve done, success!")
540 self._status.set_status("Finished")
541 self._status.set_progress(1.0)
542 # remember the encoding parameters, use them again next time
543 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
544 offsets_tuple) = self.verinfo
545 self._node._populate_required_shares(k)
546 self._node._populate_total_shares(N)
547 eventually(self._done_deferred.callback, res)