self.needed_shares = needed_shares
self.homeless_shares = set(range(total_shares))
- self.contacted_servers = [] # servers worth asking again
- self.contacted_servers2 = [] # servers that we have asked again
+ self.contacted_trackers = [] # servers worth asking again
+ self.contacted_trackers2 = [] # servers that we have asked again
self._started_second_pass = False
- self.use_servers = set() # ServerTrackers that have shares assigned
- # to them
+ self.use_trackers = set() # ServerTrackers that have shares assigned
+ # to them
self.preexisting_shares = {} # shareid => set(serverids) holding shareid
- # We don't try to allocate shares to these servers, since they've said
- # that they're incapable of storing shares of the size that we'd want
- # to store. We keep them around because they may have existing shares
- # for this storage index, which we want to know about for accurate
- # servers_of_happiness accounting
- # (this is eventually a list, but it is initialized later)
- self.readonly_servers = None
+
# These servers have shares -- any shares -- for our SI. We keep
# track of these to write an error message with them later.
self.servers_with_shares = set()
bucket_cancel_secret_hash(file_cancel_secret,
serverid))
for (serverid, conn) in servers]
- self.uncontacted_servers = _make_trackers(writable_servers)
- self.readonly_servers = _make_trackers(readonly_servers)
+ self.uncontacted_trackers = _make_trackers(writable_servers)
+
+ # We don't try to allocate shares to these servers, since they've
+ # said that they're incapable of storing shares of the size that we'd
+ # want to store. We ask them about existing shares for this storage
+ # index, which we want to know about for accurate
+ # servers_of_happiness accounting, then we forget about them.
+ readonly_trackers = _make_trackers(readonly_servers)
+
# We now ask servers that can't hold any new shares about existing
# shares that they might have for our SI. Once this is done, we
# start placing the shares that we haven't already accounted
# for.
ds = []
- if self._status and self.readonly_servers:
+ if self._status and readonly_trackers:
self._status.set_status("Contacting readonly servers to find "
"any existing shares")
- for server in self.readonly_servers:
- assert isinstance(server, ServerTracker)
- d = server.ask_about_existing_shares()
- d.addBoth(self._handle_existing_response, server.serverid)
+ for tracker in readonly_trackers:
+ assert isinstance(tracker, ServerTracker)
+ d = tracker.ask_about_existing_shares()
+ d.addBoth(self._handle_existing_response, tracker.serverid)
ds.append(d)
self.num_servers_contacted += 1
self.query_count += 1
self.log("asking server %s for any existing shares" %
- (idlib.shortnodeid_b2a(server.serverid),),
+ (idlib.shortnodeid_b2a(tracker.serverid),),
level=log.NOISY)
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._loop())
def _loop(self):
if not self.homeless_shares:
- merged = merge_peers(self.preexisting_shares, self.use_servers)
+ merged = merge_peers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if self.servers_of_happiness <= effective_happiness:
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
- "self.use_servers: %s, self.preexisting_shares: %s") \
+ "self.use_trackers: %s, self.preexisting_shares: %s") \
% (self, self._get_progress_message(),
pretty_print_shnum_to_servers(merged),
[', '.join([str_shareloc(k,v)
- for k,v in s.buckets.iteritems()])
- for s in self.use_servers],
+ for k,v in st.buckets.iteritems()])
+ for st in self.use_trackers],
pretty_print_shnum_to_servers(self.preexisting_shares))
self.log(msg, level=log.OPERATIONAL)
- return (self.use_servers, self.preexisting_shares)
+ return (self.use_trackers, self.preexisting_shares)
else:
# We're not okay right now, but maybe we can fix it by
# redistributing some shares. In cases where one or two
shares_to_spread = sum([len(list(sharelist)) - 1
for (server, sharelist)
in shares.items()])
- if delta <= len(self.uncontacted_servers) and \
+ if delta <= len(self.uncontacted_trackers) and \
shares_to_spread >= delta:
items = shares.items()
while len(self.homeless_shares) < delta:
if not self.preexisting_shares[share]:
del self.preexisting_shares[share]
items.append((server, sharelist))
- for writer in self.use_servers:
+ for writer in self.use_trackers:
writer.abort_some_buckets(self.homeless_shares)
return self._loop()
else:
self.log(servmsg, level=log.INFREQUENT)
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
- if self.uncontacted_servers:
- server = self.uncontacted_servers.pop(0)
+ if self.uncontacted_trackers:
+ tracker = self.uncontacted_trackers.pop(0)
# TODO: don't pre-convert all serverids to ServerTrackers
- assert isinstance(server, ServerTracker)
+ assert isinstance(tracker, ServerTracker)
shares_to_ask = set(sorted(self.homeless_shares)[:1])
self.homeless_shares -= shares_to_ask
if self._status:
self._status.set_status("Contacting Servers [%s] (first query),"
" %d shares left.."
- % (idlib.shortnodeid_b2a(server.serverid),
+ % (idlib.shortnodeid_b2a(tracker.serverid),
len(self.homeless_shares)))
- d = server.query(shares_to_ask)
- d.addBoth(self._got_response, server, shares_to_ask,
- self.contacted_servers)
+ d = tracker.query(shares_to_ask)
+ d.addBoth(self._got_response, tracker, shares_to_ask,
+ self.contacted_trackers)
return d
- elif self.contacted_servers:
+ elif self.contacted_trackers:
# ask a server that we've already asked.
if not self._started_second_pass:
self.log("starting second pass",
level=log.NOISY)
self._started_second_pass = True
num_shares = mathutil.div_ceil(len(self.homeless_shares),
- len(self.contacted_servers))
- server = self.contacted_servers.pop(0)
+ len(self.contacted_trackers))
+ tracker = self.contacted_trackers.pop(0)
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
self.homeless_shares -= shares_to_ask
self.query_count += 1
if self._status:
self._status.set_status("Contacting Servers [%s] (second query),"
" %d shares left.."
- % (idlib.shortnodeid_b2a(server.serverid),
+ % (idlib.shortnodeid_b2a(tracker.serverid),
len(self.homeless_shares)))
- d = server.query(shares_to_ask)
- d.addBoth(self._got_response, server, shares_to_ask,
- self.contacted_servers2)
+ d = tracker.query(shares_to_ask)
+ d.addBoth(self._got_response, tracker, shares_to_ask,
+ self.contacted_trackers2)
return d
- elif self.contacted_servers2:
+ elif self.contacted_trackers2:
# we've finished the second-or-later pass. Move all the remaining
- # servers back into self.contacted_servers for the next pass.
- self.contacted_servers.extend(self.contacted_servers2)
- self.contacted_servers2[:] = []
+ # servers back into self.contacted_trackers for the next pass.
+ self.contacted_trackers.extend(self.contacted_trackers2)
+ self.contacted_trackers2[:] = []
return self._loop()
else:
# no more servers. If we haven't placed enough shares, we fail.
- merged = merge_peers(self.preexisting_shares, self.use_servers)
+ merged = merge_peers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if effective_happiness < self.servers_of_happiness:
msg = failure_message(len(self.servers_with_shares),
msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
self.log(msg, level=log.OPERATIONAL)
- return (self.use_servers, self.preexisting_shares)
+ return (self.use_trackers, self.preexisting_shares)
- def _got_response(self, res, server, shares_to_ask, put_server_here):
+ def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
- self.log("%s got error during server selection: %s" % (server, res),
+ self.log("%s got error during server selection: %s" % (tracker, res),
level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
self.homeless_shares |= shares_to_ask
- if (self.uncontacted_servers
- or self.contacted_servers
- or self.contacted_servers2):
+ if (self.uncontacted_trackers
+ or self.contacted_trackers
+ or self.contacted_trackers2):
# there is still hope, so just loop
pass
else:
# failure we got: if a coding error causes all servers to fail
# in the same way, this allows the common failure to be seen
# by the uploader and should help with debugging
- msg = ("last failure (from %s) was: %s" % (server, res))
+ msg = ("last failure (from %s) was: %s" % (tracker, res))
self.last_failure_msg = msg
else:
(alreadygot, allocated) = res
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
- % (idlib.shortnodeid_b2a(server.serverid),
+ % (idlib.shortnodeid_b2a(tracker.serverid),
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
level=log.NOISY)
progress = False
for s in alreadygot:
- self.preexisting_shares.setdefault(s, set()).add(server.serverid)
+ self.preexisting_shares.setdefault(s, set()).add(tracker.serverid)
if s in self.homeless_shares:
self.homeless_shares.remove(s)
progress = True
# the ServerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.
if allocated:
- self.use_servers.add(server)
+ self.use_trackers.add(tracker)
progress = True
if allocated or alreadygot:
- self.servers_with_shares.add(server.serverid)
+ self.servers_with_shares.add(tracker.serverid)
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
else:
# if they *were* able to accept everything, they might be
# willing to accept even more.
- put_server_here.append(server)
+ put_tracker_here.append(tracker)
# now loop
return self._loop()
place shares for this file. I then raise an
UploadUnhappinessError with my msg argument.
"""
- for server in self.use_servers:
- assert isinstance(server, ServerTracker)
-
- server.abort()
-
+ for tracker in self.use_trackers:
+ assert isinstance(tracker, ServerTracker)
+ tracker.abort()
raise UploadUnhappinessError(msg)