from twisted.application import service
from foolscap.eventual import eventually
-from allmydata.util import base32, mathutil, hashutil, log
+from allmydata.util import base32, mathutil, hashutil, log, idlib
from allmydata.util.assertutil import _assert
from allmydata import codec, hashtree, storage, uri
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
self._size = u.size
self._num_needed_shares = u.needed_shares
+ self._si_s = storage.si_b2a(self._storage_index)
self.init_logging()
self._status = s = DownloadStatus()
dl = []
for (peerid,ss) in self._client.get_permuted_peers("storage",
self._storage_index):
+ peerid_s = idlib.shortnodeid_b2a(peerid)
d = ss.callRemote("get_buckets", self._storage_index)
- d.addCallbacks(self._got_response, self._got_error)
+ d.addCallbacks(self._got_response, self._got_error,
+ callbackArgs=(peerid_s,))
dl.append(d)
self._responses_received = 0
self._queries_sent = len(dl)
self._queries_sent))
return defer.DeferredList(dl)
- def _got_response(self, buckets):
+ def _got_response(self, buckets, peerid_s):
self._responses_received += 1
if self._status:
self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received,
self._queries_sent))
for sharenum, bucket in buckets.iteritems():
- b = storage.ReadBucketProxy(bucket)
+ b = storage.ReadBucketProxy(bucket, peerid_s, self._si_s)
self.add_share_bucket(sharenum, b)
self._uri_extension_sources.append(b)
if h != self._uri_extension_hash:
self._fetch_failures["uri_extension"] += 1
msg = ("The copy of uri_extension we received from "
- "%s was bad" % bucket)
+ "%s was bad: wanted %s, got %s" %
+ (bucket,
+ base32.b2a(self._uri_extension_hash),
+ base32.b2a(h)))
+ self.log(msg, level=log.SCARY)
raise BadURIExtensionHashValue(msg)
return self._unpack_uri_extension_data(proposal)
return self._obtain_validated_thing(None,
d.addCallback(lambda res: getattr(bucket, methname)(*args))
d.addCallback(validatorfunc, bucket)
def _bad(f):
- self.log("WEIRD: %s from vbucket %s failed: %s" % (name, bucket, f))
+ self.log("%s from vbucket %s failed:" % (name, bucket),
+ failure=f, level=log.WEIRD)
if not sources:
raise NotEnoughPeersError("ran out of peers, last error was %s"
% (f,))
if k not in self._sharemap:
self._sharemap[k] = []
self._sharemap[k].append(peerid)
- self._readers.update(buckets.values())
+ self._readers.update( [ (bucket, peerid)
+ for bucket in buckets.values() ] )
def _got_error(self, f):
if f.check(KeyError):
if not self._readers:
self.log("no readers, so no UEB", level=log.NOISY)
return
- b = self._readers.pop()
- rbp = storage.ReadBucketProxy(b)
+ b,peerid = self._readers.pop()
+ rbp = storage.ReadBucketProxy(b, idlib.shortnodeid_b2a(peerid),
+ storage.si_b2a(self._storage_index))
d = rbp.startIfNecessary()
d.addCallback(lambda res: rbp.get_uri_extension())
d.addCallback(self._got_uri_extension)