num_segments, total_shares, needed_shares,
servers_of_happiness):
"""
- @return: (upload_trackers, already_servers), where upload_trackers is
- a set of ServerTracker instances that have agreed to hold
+ @return: (upload_trackers, already_serverids), where upload_trackers
+ is a set of ServerTracker instances that have agreed to hold
some shares for us (the shareids are stashed inside the
- ServerTracker), and already_servers is a dict mapping shnum
- to a set of serverids which claim to already have the share.
+ ServerTracker), and already_serverids is a dict mapping
+ shnum to a set of serverids for servers which claim to
+ already have the share.
"""
if self._status:
self.needed_shares = needed_shares
self.homeless_shares = set(range(total_shares))
- self.contacted_trackers = [] # servers worth asking again
- self.contacted_trackers2 = [] # servers that we have asked again
- self._started_second_pass = False
self.use_trackers = set() # ServerTrackers that have shares assigned
# to them
self.preexisting_shares = {} # shareid => set(serverids) holding shareid
renew, cancel)
trackers.append(st)
return trackers
- self.uncontacted_trackers = _make_trackers(writable_servers)
+
+ # We assign each servers/trackers into one three lists. They all
+ # start in the "first pass" list. During the first pass, as we ask
+ # each one to hold a share, we move their tracker to the "second
+ # pass" list, until the first-pass list is empty. Then during the
+ # second pass, as we ask each to hold more shares, we move their
+ # tracker to the "next pass" list, until the second-pass list is
+ # empty. Then we move everybody from the next-pass list back to the
+ # second-pass list and repeat the "second" pass (really the third,
+ # fourth, etc pass), until all shares are assigned, or we've run out
+ # of potential servers.
+ self.first_pass_trackers = _make_trackers(writable_servers)
+ self.second_pass_trackers = [] # servers worth asking again
+ self.next_pass_trackers = [] # servers that we have asked again
+ self._started_second_pass = False
# We don't try to allocate shares to these servers, since they've
# said that they're incapable of storing shares of the size that we'd
shares_to_spread = sum([len(list(sharelist)) - 1
for (server, sharelist)
in shares.items()])
- if delta <= len(self.uncontacted_trackers) and \
+ if delta <= len(self.first_pass_trackers) and \
shares_to_spread >= delta:
items = shares.items()
while len(self.homeless_shares) < delta:
self.log(servmsg, level=log.INFREQUENT)
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
- if self.uncontacted_trackers:
- tracker = self.uncontacted_trackers.pop(0)
+ if self.first_pass_trackers:
+ tracker = self.first_pass_trackers.pop(0)
# TODO: don't pre-convert all serverids to ServerTrackers
assert isinstance(tracker, ServerTracker)
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
- self.contacted_trackers)
+ self.second_pass_trackers)
return d
- elif self.contacted_trackers:
+ elif self.second_pass_trackers:
# ask a server that we've already asked.
if not self._started_second_pass:
self.log("starting second pass",
level=log.NOISY)
self._started_second_pass = True
num_shares = mathutil.div_ceil(len(self.homeless_shares),
- len(self.contacted_trackers))
- tracker = self.contacted_trackers.pop(0)
+ len(self.second_pass_trackers))
+ tracker = self.second_pass_trackers.pop(0)
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
self.homeless_shares -= shares_to_ask
self.query_count += 1
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
- self.contacted_trackers2)
+ self.next_pass_trackers)
return d
- elif self.contacted_trackers2:
+ elif self.next_pass_trackers:
# we've finished the second-or-later pass. Move all the remaining
- # servers back into self.contacted_trackers for the next pass.
- self.contacted_trackers.extend(self.contacted_trackers2)
- self.contacted_trackers2[:] = []
+ # servers back into self.second_pass_trackers for the next pass.
+ self.second_pass_trackers.extend(self.next_pass_trackers)
+ self.next_pass_trackers[:] = []
return self._loop()
else:
# no more servers. If we haven't placed enough shares, we fail.
self.error_count += 1
self.bad_query_count += 1
self.homeless_shares |= shares_to_ask
- if (self.uncontacted_trackers
- or self.contacted_trackers
- or self.contacted_trackers2):
+ if (self.first_pass_trackers
+ or self.second_pass_trackers
+ or self.next_pass_trackers):
# there is still hope, so just loop
pass
else:
d.addCallback(_done)
return d
- def set_shareholders(self, (upload_trackers, already_servers), encoder):
+ def set_shareholders(self, (upload_trackers, already_serverids), encoder):
"""
@param upload_trackers: a sequence of ServerTracker objects that
have agreed to hold some shares for us (the
shareids are stashed inside the ServerTracker)
- @paran already_servers: a dict mapping sharenum to a set of serverids
- that claim to already have this share
+
+ @paran already_serverids: a dict mapping sharenum to a set of
+ serverids for servers that claim to already
+ have this share
"""
- msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s"
+ msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
values = ([', '.join([str_shareloc(k,v)
for k,v in st.buckets.iteritems()])
- for st in upload_trackers], already_servers)
+ for st in upload_trackers], already_serverids)
self.log(msgtempl % values, level=log.OPERATIONAL)
# record already-present shares in self._results
- self._results.preexisting_shares = len(already_servers)
+ self._results.preexisting_shares = len(already_serverids)
self._server_trackers = {} # k: shnum, v: instance of ServerTracker
for tracker in upload_trackers:
assert isinstance(tracker, ServerTracker)
buckets = {}
- servermap = already_servers.copy()
+ servermap = already_serverids.copy()
for tracker in upload_trackers:
buckets.update(tracker.buckets)
for shnum in tracker.buckets: