return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
class ServerTracker:
- def __init__(self, serverid, storage_server,
+ def __init__(self, server,
sharesize, blocksize, num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret, bucket_cancel_secret):
- precondition(isinstance(serverid, str), serverid)
- precondition(len(serverid) == 20, serverid)
- self.serverid = serverid
- self._storageserver = storage_server # to an RIStorageServer
+ self._server = server
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
wbp = layout.make_write_bucket_proxy(None, sharesize,
blocksize, num_segments,
num_share_hashes,
- EXTENSION_SIZE, serverid)
+ EXTENSION_SIZE, server.get_serverid())
self.wbp_class = wbp.__class__ # to create more of them
self.allocated_size = wbp.get_allocated_size()
self.blocksize = blocksize
def __repr__(self):
return ("<ServerTracker for server %s and SI %s>"
- % (idlib.shortnodeid_b2a(self.serverid),
- si_b2a(self.storage_index)[:5]))
+ % (self._server.name(), si_b2a(self.storage_index)[:5]))
+
+ def get_serverid(self):
+ return self._server.get_serverid()
+ def name(self):
+ return self._server.name()
def query(self, sharenums):
- d = self._storageserver.callRemote("allocate_buckets",
- self.storage_index,
- self.renew_secret,
- self.cancel_secret,
- sharenums,
- self.allocated_size,
- canary=Referenceable())
+ rref = self._server.get_rref()
+ d = rref.callRemote("allocate_buckets",
+ self.storage_index,
+ self.renew_secret,
+ self.cancel_secret,
+ sharenums,
+ self.allocated_size,
+ canary=Referenceable())
d.addCallback(self._got_reply)
return d
def ask_about_existing_shares(self):
- return self._storageserver.callRemote("get_buckets",
- self.storage_index)
+ rref = self._server.get_rref()
+ return rref.callRemote("get_buckets", self.storage_index)
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
self.num_segments,
self.num_share_hashes,
EXTENSION_SIZE,
- self.serverid)
+ self._server.get_serverid())
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
num_share_hashes, EXTENSION_SIZE,
None)
allocated_size = wbp.get_allocated_size()
- all_servers = [(s.get_serverid(), s.get_rref())
- for s in storage_broker.get_servers_for_psi(storage_index)]
+ all_servers = storage_broker.get_servers_for_psi(storage_index)
if not all_servers:
raise NoServersError("client gave us zero servers")
# field) from getting large shares (for files larger than about
# 12GiB). See #439 for details.
def _get_maxsize(server):
- (serverid, conn) = server
- v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
+ v0 = server.get_rref().version
+ v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
writable_servers = [server for server in all_servers
if _get_maxsize(server) >= allocated_size]
storage_index)
def _make_trackers(servers):
trackers = []
- for (serverid, conn) in servers:
- seed = serverid
+ for s in servers:
+ seed = s.get_lease_seed()
renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
- st = ServerTracker(serverid, conn,
+ st = ServerTracker(s,
share_size, block_size,
num_segments, num_share_hashes,
storage_index,
for tracker in readonly_trackers:
assert isinstance(tracker, ServerTracker)
d = tracker.ask_about_existing_shares()
- d.addBoth(self._handle_existing_response, tracker.serverid)
+ d.addBoth(self._handle_existing_response, tracker)
ds.append(d)
self.num_servers_contacted += 1
self.query_count += 1
self.log("asking server %s for any existing shares" %
- (idlib.shortnodeid_b2a(tracker.serverid),),
- level=log.NOISY)
+ (tracker.name(),), level=log.NOISY)
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._loop())
return dl
- def _handle_existing_response(self, res, serverid):
+ def _handle_existing_response(self, res, tracker):
"""
I handle responses to the queries sent by
Tahoe2ServerSelector._existing_shares.
"""
+ serverid = tracker.get_serverid()
if isinstance(res, failure.Failure):
self.log("%s got error during existing shares check: %s"
- % (idlib.shortnodeid_b2a(serverid), res),
- level=log.UNUSUAL)
+ % (tracker.name(), res), level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
else:
if buckets:
self.serverids_with_shares.add(serverid)
self.log("response to get_buckets() from server %s: alreadygot=%s"
- % (idlib.shortnodeid_b2a(serverid), tuple(sorted(buckets))),
+ % (tracker.name(), tuple(sorted(buckets))),
level=log.NOISY)
for bucket in buckets:
self.preexisting_shares.setdefault(bucket, set()).add(serverid)
if self._status:
self._status.set_status("Contacting Servers [%s] (first query),"
" %d shares left.."
- % (idlib.shortnodeid_b2a(tracker.serverid),
+ % (tracker.name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
if self._status:
self._status.set_status("Contacting Servers [%s] (second query),"
" %d shares left.."
- % (idlib.shortnodeid_b2a(tracker.serverid),
+ % (tracker.name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
else:
(alreadygot, allocated) = res
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
- % (idlib.shortnodeid_b2a(tracker.serverid),
+ % (tracker.name(),
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
level=log.NOISY)
progress = False
for s in alreadygot:
- self.preexisting_shares.setdefault(s, set()).add(tracker.serverid)
+ self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
if s in self.homeless_shares:
self.homeless_shares.remove(s)
progress = True
progress = True
if allocated or alreadygot:
- self.serverids_with_shares.add(tracker.serverid)
+ self.serverids_with_shares.add(tracker.get_serverid())
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
buckets.update(tracker.buckets)
for shnum in tracker.buckets:
self._server_trackers[shnum] = tracker
- servermap.setdefault(shnum, set()).add(tracker.serverid)
+ servermap.setdefault(shnum, set()).add(tracker.get_serverid())
assert len(buckets) == sum([len(tracker.buckets)
for tracker in upload_trackers]), \
"%s (%s) != %s (%s)" % (
len(buckets),
buckets,
sum([len(tracker.buckets) for tracker in upload_trackers]),
- [(t.buckets, t.serverid) for t in upload_trackers]
+ [(t.buckets, t.get_serverid()) for t in upload_trackers]
)
encoder.set_shareholders(buckets, servermap)
r = self._results
for shnum in self._encoder.get_shares_placed():
server_tracker = self._server_trackers[shnum]
- serverid = server_tracker.serverid
+ serverid = server_tracker.get_serverid()
r.sharemap.add(shnum, serverid)
r.servermap.add(serverid, shnum)
r.pushed_shares = len(self._encoder.get_shares_placed())