num_segments, total_shares, needed_shares,
servers_of_happiness):
"""
- @return: (upload_servers, already_servers), where upload_servers is
+ @return: (upload_trackers, already_servers), where upload_trackers is
a set of ServerTracker instances that have agreed to hold
some shares for us (the shareids are stashed inside the
ServerTracker), and already_servers is a dict mapping shnum
- to a set of servers which claim to already have the share.
+ to a set of serverids which claim to already have the share.
"""
if self._status:
# These servers have shares -- any shares -- for our SI. We keep
# track of these to write an error message with them later.
- self.servers_with_shares = set()
+ self.serverids_with_shares = set()
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
return dl
- def _handle_existing_response(self, res, server):
+ def _handle_existing_response(self, res, serverid):
"""
I handle responses to the queries sent by
Tahoe2ServerSelector._existing_shares.
"""
if isinstance(res, failure.Failure):
self.log("%s got error during existing shares check: %s"
- % (idlib.shortnodeid_b2a(server), res),
+ % (idlib.shortnodeid_b2a(serverid), res),
level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
else:
buckets = res
if buckets:
- self.servers_with_shares.add(server)
+ self.serverids_with_shares.add(serverid)
self.log("response to get_buckets() from server %s: alreadygot=%s"
- % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))),
+ % (idlib.shortnodeid_b2a(serverid), tuple(sorted(buckets))),
level=log.NOISY)
for bucket in buckets:
- self.preexisting_shares.setdefault(bucket, set()).add(server)
+ self.preexisting_shares.setdefault(bucket, set()).add(serverid)
self.homeless_shares.discard(bucket)
self.full_count += 1
self.bad_query_count += 1
return self._loop()
else:
# Redistribution won't help us; fail.
- server_count = len(self.servers_with_shares)
+ server_count = len(self.serverids_with_shares)
failmsg = failure_message(server_count,
self.needed_shares,
self.servers_of_happiness,
merged = merge_peers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if effective_happiness < self.servers_of_happiness:
- msg = failure_message(len(self.servers_with_shares),
+ msg = failure_message(len(self.serverids_with_shares),
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
progress = True
if allocated or alreadygot:
- self.servers_with_shares.add(tracker.serverid)
+ self.serverids_with_shares.add(tracker.serverid)
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
d.addCallback(_done)
return d
- def set_shareholders(self, (upload_servers, already_servers), encoder):
+ def set_shareholders(self, (upload_trackers, already_servers), encoder):
"""
- @param upload_servers: a sequence of ServerTracker objects that
- have agreed to hold some shares for us (the
- shareids are stashed inside the ServerTracker)
+ @param upload_trackers: a sequence of ServerTracker objects that
+ have agreed to hold some shares for us (the
+ shareids are stashed inside the ServerTracker)
@paran already_servers: a dict mapping sharenum to a set of serverids
that claim to already have this share
"""
- msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s"
- values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()])
- for s in upload_servers], already_servers)
+ msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s"
+ values = ([', '.join([str_shareloc(k,v)
+ for k,v in st.buckets.iteritems()])
+ for st in upload_trackers], already_servers)
self.log(msgtempl % values, level=log.OPERATIONAL)
# record already-present shares in self._results
self._results.preexisting_shares = len(already_servers)
self._server_trackers = {} # k: shnum, v: instance of ServerTracker
- for server in upload_servers:
- assert isinstance(server, ServerTracker)
+ for tracker in upload_trackers:
+ assert isinstance(tracker, ServerTracker)
buckets = {}
servermap = already_servers.copy()
- for server in upload_servers:
- buckets.update(server.buckets)
- for shnum in server.buckets:
- self._server_trackers[shnum] = server
- servermap.setdefault(shnum, set()).add(server.serverid)
- assert len(buckets) == sum([len(server.buckets)
- for server in upload_servers]), \
+ for tracker in upload_trackers:
+ buckets.update(tracker.buckets)
+ for shnum in tracker.buckets:
+ self._server_trackers[shnum] = tracker
+ servermap.setdefault(shnum, set()).add(tracker.serverid)
+ assert len(buckets) == sum([len(tracker.buckets)
+ for tracker in upload_trackers]), \
"%s (%s) != %s (%s)" % (
len(buckets),
buckets,
- sum([len(server.buckets) for server in upload_servers]),
- [(s.buckets, s.serverid) for s in upload_servers]
+ sum([len(tracker.buckets) for tracker in upload_trackers]),
+ [(t.buckets, t.serverid) for t in upload_trackers]
)
encoder.set_shareholders(buckets, servermap)