rref = s.get_rref()
lease_seed = s.get_lease_seed()
- serverid = s.get_serverid()
if self._add_lease:
renew_secret = self._get_renewal_secret(lease_seed)
cancel_secret = self._get_cancel_secret(lease_seed)
d = rref.callRemote("get_buckets", storageindex)
def _wrap_results(res):
- return (res, serverid, True)
+ return (res, True)
def _trap_errs(f):
level = log.WEIRD
self.log("failure from server on 'get_buckets' the REMOTE failure was:",
facility="tahoe.immutable.checker",
failure=f, level=level, umid="AX7wZQ")
- return ({}, serverid, False)
+ return ({}, False)
d.addCallbacks(_wrap_results, _trap_errs)
return d
level=log.WEIRD, umid="hEGuQg")
- def _download_and_verify(self, serverid, sharenum, bucket):
+ def _download_and_verify(self, server, sharenum, bucket):
"""Start an attempt to download and verify every block in this bucket
and return a deferred that will eventually fire once the attempt
completes.
results."""
vcap = self._verifycap
- b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index())
+ b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index())
veup = ValidatedExtendedURIProxy(b, vcap)
d = veup.start()
def _verify_server_shares(self, s):
""" Return a deferred which eventually fires with a tuple of
- (set(sharenum), serverid, set(corruptsharenum),
+ (set(sharenum), server, set(corruptsharenum),
set(incompatiblesharenum), success) showing all the shares verified
to be served by this server, and all the corrupt shares served by the
server, and all the incompatible shares served by the server. In case
d = self._get_buckets(s, self._verifycap.get_storage_index())
def _got_buckets(result):
- bucketdict, serverid, success = result
+ bucketdict, success = result
shareverds = []
for (sharenum, bucket) in bucketdict.items():
- d = self._download_and_verify(serverid, sharenum, bucket)
+ d = self._download_and_verify(s, sharenum, bucket)
shareverds.append(d)
dl = deferredutil.gatherResults(shareverds)
corrupt.add(sharenum)
elif whynot == 'incompatible':
incompatible.add(sharenum)
- return (verified, serverid, corrupt, incompatible, success)
+ return (verified, s, corrupt, incompatible, success)
dl.addCallback(collect)
return dl
def _err(f):
f.trap(RemoteException, DeadReferenceError)
- return (set(), s.get_serverid(), set(), set(), False)
+ return (set(), s, set(), set(), False)
d.addCallbacks(_got_buckets, _err)
return d
def _check_server_shares(self, s):
"""Return a deferred which eventually fires with a tuple of
- (set(sharenum), serverid, set(), set(), responded) showing all the
+ (set(sharenum), server, set(), set(), responded) showing all the
shares claimed to be served by this server. In case the server is
- disconnected then it fires with (set() serverid, set(), set(), False)
+ disconnected then it fires with (set(), server, set(), set(), False)
(a server disconnecting when we ask it for buckets is the same, for
our purposes, as a server that says it has none, except that we want
to track and report whether or not each server responded.)"""
def _curry_empty_corrupted(res):
- buckets, serverid, responded = res
- return (set(buckets), serverid, set(), set(), responded)
+ buckets, responded = res
+ return (set(buckets), s, set(), set(), responded)
d = self._get_buckets(s, self._verifycap.get_storage_index())
d.addCallback(_curry_empty_corrupted)
return d
corruptsharelocators = [] # (serverid, storageindex, sharenum)
incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
- for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
+ for theseverifiedshares, thisserver, thesecorruptshares, theseincompatibleshares, thisresponded in results:
+ thisserverid = thisserver.get_serverid()
servers.setdefault(thisserverid, set()).update(theseverifiedshares)
for sharenum in theseverifiedshares:
verifiedshares.setdefault(sharenum, set()).add(thisserverid)
from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE
-from allmydata.util import mathutil, idlib, observer, pipeline
+from allmydata.util import mathutil, observer, pipeline
from allmydata.util.assertutil import precondition
from allmydata.storage.server import si_b2a
MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
- def __init__(self, rref, peerid, storage_index):
+ def __init__(self, rref, server, storage_index):
self._rref = rref
- self._peerid = peerid
- peer_id_s = idlib.shortnodeid_b2a(peerid)
- storage_index_s = si_b2a(storage_index)
- self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
+ self._server = server
+ self._storage_index = storage_index
self._started = False # sent request to server
self._ready = observer.OneShotObserverList() # got response from server
def get_peerid(self):
- return self._peerid
+ return self._server.get_serverid()
def __repr__(self):
- return self._reprstr
+ return "<ReadBucketProxy %s to peer [%s] SI %s>" % \
+ (id(self), self._server.get_name(), si_b2a(self._storage_index))
def _start_if_needed(self):
""" Returns a deferred that will be fired when I'm ready to return
from allmydata.util.encodingutil import quote_output, to_str
# use a ReadBucketProxy to parse the bucket and find the uri extension
- bp = ReadBucketProxy(None, '', '')
+ bp = ReadBucketProxy(None, None, '')
offsets = bp._parse_offsets(f.read_share_data(0, 0x44))
print >>out, "%20s: %d" % ("version", bp._version)
seek = offsets['uri_extension']
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
- ReadBucketProxy.__init__(self, "", "", "")
+ ReadBucketProxy.__init__(self, None, None, "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
else:
# otherwise assume it's immutable
f = ShareFile(fn)
- bp = ReadBucketProxy(None, '', '')
+ bp = ReadBucketProxy(None, None, '')
offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
start = f._data_offset + offsets["data"]
end = f._data_offset + offsets["plaintext_hash_tree"]