]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/upload.py
Let Uploader retain History instead of passing it into upload(). Fixes #1079.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
index 2758520b092e0c0e7b5015877ba1aa35329184a2..013aca97cbc6a9ce2c221531cc9e444f5bfdd33e 100644 (file)
@@ -14,7 +14,7 @@ from allmydata.storage.server import si_b2a
 from allmydata.immutable import encode
 from allmydata.util import base32, dictutil, idlib, log, mathutil
 from allmydata.util.happinessutil import servers_of_happiness, \
-                                         shares_by_server, merge_peers, \
+                                         shares_by_server, merge_servers, \
                                          failure_message
 from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import add_version_to_remote_reference
@@ -69,21 +69,18 @@ def pretty_print_shnum_to_servers(s):
     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
 
 class ServerTracker:
-    def __init__(self, serverid, storage_server,
+    def __init__(self, server,
                  sharesize, blocksize, num_segments, num_share_hashes,
                  storage_index,
                  bucket_renewal_secret, bucket_cancel_secret):
-        precondition(isinstance(serverid, str), serverid)
-        precondition(len(serverid) == 20, serverid)
-        self.serverid = serverid
-        self._storageserver = storage_server # to an RIStorageServer
+        self._server = server
         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
         self.sharesize = sharesize
 
-        wbp = layout.make_write_bucket_proxy(None, sharesize,
+        wbp = layout.make_write_bucket_proxy(None, None, sharesize,
                                              blocksize, num_segments,
                                              num_share_hashes,
-                                             EXTENSION_SIZE, serverid)
+                                             EXTENSION_SIZE)
         self.wbp_class = wbp.__class__ # to create more of them
         self.allocated_size = wbp.get_allocated_size()
         self.blocksize = blocksize
@@ -96,34 +93,38 @@ class ServerTracker:
 
     def __repr__(self):
         return ("<ServerTracker for server %s and SI %s>"
-                % (idlib.shortnodeid_b2a(self.serverid),
-                   si_b2a(self.storage_index)[:5]))
+                % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
+
+    def get_serverid(self):
+        return self._server.get_serverid()
+    def get_name(self):
+        return self._server.get_name()
 
     def query(self, sharenums):
-        d = self._storageserver.callRemote("allocate_buckets",
-                                           self.storage_index,
-                                           self.renew_secret,
-                                           self.cancel_secret,
-                                           sharenums,
-                                           self.allocated_size,
-                                           canary=Referenceable())
+        rref = self._server.get_rref()
+        d = rref.callRemote("allocate_buckets",
+                            self.storage_index,
+                            self.renew_secret,
+                            self.cancel_secret,
+                            sharenums,
+                            self.allocated_size,
+                            canary=Referenceable())
         d.addCallback(self._got_reply)
         return d
 
     def ask_about_existing_shares(self):
-        return self._storageserver.callRemote("get_buckets",
-                                              self.storage_index)
+        rref = self._server.get_rref()
+        return rref.callRemote("get_buckets", self.storage_index)
 
     def _got_reply(self, (alreadygot, buckets)):
         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
         b = {}
         for sharenum, rref in buckets.iteritems():
-            bp = self.wbp_class(rref, self.sharesize,
+            bp = self.wbp_class(rref, self._server, self.sharesize,
                                 self.blocksize,
                                 self.num_segments,
                                 self.num_share_hashes,
-                                EXTENSION_SIZE,
-                                self.serverid)
+                                EXTENSION_SIZE)
             b[sharenum] = bp
         self.buckets.update(b)
         return (alreadygot, set(b.keys()))
@@ -147,7 +148,7 @@ class ServerTracker:
 
 
 def str_shareloc(shnum, bucketwriter):
-    return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
+    return "%s: %s" % (shnum, bucketwriter.get_servername(),)
 
 class Tahoe2ServerSelector(log.PrefixingLogMixin):
 
@@ -171,11 +172,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                          num_segments, total_shares, needed_shares,
                          servers_of_happiness):
         """
-        @return: (upload_servers, already_servers), where upload_servers is
-                 a set of ServerTracker instances that have agreed to hold
+        @return: (upload_trackers, already_serverids), where upload_trackers
+                 is a set of ServerTracker instances that have agreed to hold
                  some shares for us (the shareids are stashed inside the
-                 ServerTracker), and already_servers is a dict mapping shnum
-                 to a set of servers which claim to already have the share.
+                 ServerTracker), and already_serverids is a dict mapping
+                 shnum to a set of serverids for servers which claim to
+                 already have the share.
         """
 
         if self._status:
@@ -186,22 +188,13 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         self.needed_shares = needed_shares
 
         self.homeless_shares = set(range(total_shares))
-        self.contacted_servers = [] # servers worth asking again
-        self.contacted_servers2 = [] # servers that we have asked again
-        self._started_second_pass = False
-        self.use_servers = set() # ServerTrackers that have shares assigned
-                                 # to them
+        self.use_trackers = set() # ServerTrackers that have shares assigned
+                                  # to them
         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
-        # We don't try to allocate shares to these servers, since they've said
-        # that they're incapable of storing shares of the size that we'd want
-        # to store. We keep them around because they may have existing shares
-        # for this storage index, which we want to know about for accurate
-        # servers_of_happiness accounting
-        # (this is eventually a list, but it is initialized later)
-        self.readonly_servers = None
+
         # These servers have shares -- any shares -- for our SI. We keep
         # track of these to write an error message with them later.
-        self.servers_with_shares = set()
+        self.serverids_with_shares = set()
 
         # this needed_hashes computation should mirror
         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
@@ -211,12 +204,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
 
         # figure out how much space to ask for
-        wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
-                                             num_share_hashes, EXTENSION_SIZE,
-                                             None)
+        wbp = layout.make_write_bucket_proxy(None, None,
+                                             share_size, 0, num_segments,
+                                             num_share_hashes, EXTENSION_SIZE)
         allocated_size = wbp.get_allocated_size()
-        all_servers = [(s.get_serverid(), s.get_rref())
-                       for s in storage_broker.get_servers_for_psi(storage_index)]
+        all_servers = storage_broker.get_servers_for_psi(storage_index)
         if not all_servers:
             raise NoServersError("client gave us zero servers")
 
@@ -225,12 +217,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         # field) from getting large shares (for files larger than about
         # 12GiB). See #439 for details.
         def _get_maxsize(server):
-            (serverid, conn) = server
-            v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
+            v0 = server.get_rref().version
+            v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
             return v1["maximum-immutable-share-size"]
-        writable_servers = [server for server in all_servers
+        writeable_servers = [server for server in all_servers
                             if _get_maxsize(server) >= allocated_size]
-        readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
+        readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
 
         # decide upon the renewal/cancel secrets, to include them in the
         # allocate_buckets query.
@@ -242,60 +234,83 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
                                                      storage_index)
         def _make_trackers(servers):
-           return [ServerTracker(serverid, conn,
-                                 share_size, block_size,
-                                 num_segments, num_share_hashes,
-                                 storage_index,
-                                 bucket_renewal_secret_hash(file_renewal_secret,
-                                                            serverid),
-                                 bucket_cancel_secret_hash(file_cancel_secret,
-                                                           serverid))
-                   for (serverid, conn) in servers]
-        self.uncontacted_servers = _make_trackers(writable_servers)
-        self.readonly_servers = _make_trackers(readonly_servers)
+            trackers = []
+            for s in servers:
+                seed = s.get_lease_seed()
+                renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
+                cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
+                st = ServerTracker(s,
+                                   share_size, block_size,
+                                   num_segments, num_share_hashes,
+                                   storage_index,
+                                   renew, cancel)
+                trackers.append(st)
+            return trackers
+
+        # We assign each servers/trackers into one three lists. They all
+        # start in the "first pass" list. During the first pass, as we ask
+        # each one to hold a share, we move their tracker to the "second
+        # pass" list, until the first-pass list is empty. Then during the
+        # second pass, as we ask each to hold more shares, we move their
+        # tracker to the "next pass" list, until the second-pass list is
+        # empty. Then we move everybody from the next-pass list back to the
+        # second-pass list and repeat the "second" pass (really the third,
+        # fourth, etc pass), until all shares are assigned, or we've run out
+        # of potential servers.
+        self.first_pass_trackers = _make_trackers(writeable_servers)
+        self.second_pass_trackers = [] # servers worth asking again
+        self.next_pass_trackers = [] # servers that we have asked again
+        self._started_second_pass = False
+
+        # We don't try to allocate shares to these servers, since they've
+        # said that they're incapable of storing shares of the size that we'd
+        # want to store. We ask them about existing shares for this storage
+        # index, which we want to know about for accurate
+        # servers_of_happiness accounting, then we forget about them.
+        readonly_trackers = _make_trackers(readonly_servers)
+
         # We now ask servers that can't hold any new shares about existing
         # shares that they might have for our SI. Once this is done, we
         # start placing the shares that we haven't already accounted
         # for.
         ds = []
-        if self._status and self.readonly_servers:
+        if self._status and readonly_trackers:
             self._status.set_status("Contacting readonly servers to find "
                                     "any existing shares")
-        for server in self.readonly_servers:
-            assert isinstance(server, ServerTracker)
-            d = server.ask_about_existing_shares()
-            d.addBoth(self._handle_existing_response, server.serverid)
+        for tracker in readonly_trackers:
+            assert isinstance(tracker, ServerTracker)
+            d = tracker.ask_about_existing_shares()
+            d.addBoth(self._handle_existing_response, tracker)
             ds.append(d)
             self.num_servers_contacted += 1
             self.query_count += 1
             self.log("asking server %s for any existing shares" %
-                     (idlib.shortnodeid_b2a(server.serverid),),
-                    level=log.NOISY)
+                     (tracker.get_name(),), level=log.NOISY)
         dl = defer.DeferredList(ds)
         dl.addCallback(lambda ign: self._loop())
         return dl
 
 
-    def _handle_existing_response(self, res, server):
+    def _handle_existing_response(self, res, tracker):
         """
         I handle responses to the queries sent by
         Tahoe2ServerSelector._existing_shares.
         """
+        serverid = tracker.get_serverid()
         if isinstance(res, failure.Failure):
             self.log("%s got error during existing shares check: %s"
-                    % (idlib.shortnodeid_b2a(server), res),
-                    level=log.UNUSUAL)
+                    % (tracker.get_name(), res), level=log.UNUSUAL)
             self.error_count += 1
             self.bad_query_count += 1
         else:
             buckets = res
             if buckets:
-                self.servers_with_shares.add(server)
+                self.serverids_with_shares.add(serverid)
             self.log("response to get_buckets() from server %s: alreadygot=%s"
-                    % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))),
+                    % (tracker.get_name(), tuple(sorted(buckets))),
                     level=log.NOISY)
             for bucket in buckets:
-                self.preexisting_shares.setdefault(bucket, set()).add(server)
+                self.preexisting_shares.setdefault(bucket, set()).add(serverid)
                 self.homeless_shares.discard(bucket)
             self.full_count += 1
             self.bad_query_count += 1
@@ -323,19 +338,19 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
 
     def _loop(self):
         if not self.homeless_shares:
-            merged = merge_peers(self.preexisting_shares, self.use_servers)
+            merged = merge_servers(self.preexisting_shares, self.use_trackers)
             effective_happiness = servers_of_happiness(merged)
             if self.servers_of_happiness <= effective_happiness:
                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
-                       "self.use_servers: %s, self.preexisting_shares: %s") \
+                       "self.use_trackers: %s, self.preexisting_shares: %s") \
                        % (self, self._get_progress_message(),
                           pretty_print_shnum_to_servers(merged),
                           [', '.join([str_shareloc(k,v)
-                                      for k,v in s.buckets.iteritems()])
-                           for s in self.use_servers],
+                                      for k,v in st.buckets.iteritems()])
+                           for st in self.use_trackers],
                           pretty_print_shnum_to_servers(self.preexisting_shares))
                 self.log(msg, level=log.OPERATIONAL)
-                return (self.use_servers, self.preexisting_shares)
+                return (self.use_trackers, self.preexisting_shares)
             else:
                 # We're not okay right now, but maybe we can fix it by
                 # redistributing some shares. In cases where one or two
@@ -352,7 +367,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 shares_to_spread = sum([len(list(sharelist)) - 1
                                         for (server, sharelist)
                                         in shares.items()])
-                if delta <= len(self.uncontacted_servers) and \
+                if delta <= len(self.first_pass_trackers) and \
                    shares_to_spread >= delta:
                     items = shares.items()
                     while len(self.homeless_shares) < delta:
@@ -368,12 +383,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                             if not self.preexisting_shares[share]:
                                 del self.preexisting_shares[share]
                             items.append((server, sharelist))
-                        for writer in self.use_servers:
+                        for writer in self.use_trackers:
                             writer.abort_some_buckets(self.homeless_shares)
                     return self._loop()
                 else:
                     # Redistribution won't help us; fail.
-                    server_count = len(self.servers_with_shares)
+                    server_count = len(self.serverids_with_shares)
                     failmsg = failure_message(server_count,
                                               self.needed_shares,
                                               self.servers_of_happiness,
@@ -388,10 +403,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                     self.log(servmsg, level=log.INFREQUENT)
                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
 
-        if self.uncontacted_servers:
-            server = self.uncontacted_servers.pop(0)
+        if self.first_pass_trackers:
+            tracker = self.first_pass_trackers.pop(0)
             # TODO: don't pre-convert all serverids to ServerTrackers
-            assert isinstance(server, ServerTracker)
+            assert isinstance(tracker, ServerTracker)
 
             shares_to_ask = set(sorted(self.homeless_shares)[:1])
             self.homeless_shares -= shares_to_ask
@@ -400,45 +415,45 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             if self._status:
                 self._status.set_status("Contacting Servers [%s] (first query),"
                                         " %d shares left.."
-                                        % (idlib.shortnodeid_b2a(server.serverid),
+                                        % (tracker.get_name(),
                                            len(self.homeless_shares)))
-            d = server.query(shares_to_ask)
-            d.addBoth(self._got_response, server, shares_to_ask,
-                      self.contacted_servers)
+            d = tracker.query(shares_to_ask)
+            d.addBoth(self._got_response, tracker, shares_to_ask,
+                      self.second_pass_trackers)
             return d
-        elif self.contacted_servers:
+        elif self.second_pass_trackers:
             # ask a server that we've already asked.
             if not self._started_second_pass:
                 self.log("starting second pass",
                         level=log.NOISY)
                 self._started_second_pass = True
             num_shares = mathutil.div_ceil(len(self.homeless_shares),
-                                           len(self.contacted_servers))
-            server = self.contacted_servers.pop(0)
+                                           len(self.second_pass_trackers))
+            tracker = self.second_pass_trackers.pop(0)
             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
             self.homeless_shares -= shares_to_ask
             self.query_count += 1
             if self._status:
                 self._status.set_status("Contacting Servers [%s] (second query),"
                                         " %d shares left.."
-                                        % (idlib.shortnodeid_b2a(server.serverid),
+                                        % (tracker.get_name(),
                                            len(self.homeless_shares)))
-            d = server.query(shares_to_ask)
-            d.addBoth(self._got_response, server, shares_to_ask,
-                      self.contacted_servers2)
+            d = tracker.query(shares_to_ask)
+            d.addBoth(self._got_response, tracker, shares_to_ask,
+                      self.next_pass_trackers)
             return d
-        elif self.contacted_servers2:
+        elif self.next_pass_trackers:
             # we've finished the second-or-later pass. Move all the remaining
-            # servers back into self.contacted_servers for the next pass.
-            self.contacted_servers.extend(self.contacted_servers2)
-            self.contacted_servers2[:] = []
+            # servers back into self.second_pass_trackers for the next pass.
+            self.second_pass_trackers.extend(self.next_pass_trackers)
+            self.next_pass_trackers[:] = []
             return self._loop()
         else:
             # no more servers. If we haven't placed enough shares, we fail.
-            merged = merge_peers(self.preexisting_shares, self.use_servers)
+            merged = merge_servers(self.preexisting_shares, self.use_trackers)
             effective_happiness = servers_of_happiness(merged)
             if effective_happiness < self.servers_of_happiness:
-                msg = failure_message(len(self.servers_with_shares),
+                msg = failure_message(len(self.serverids_with_shares),
                                       self.needed_shares,
                                       self.servers_of_happiness,
                                       effective_happiness)
@@ -455,20 +470,20 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
                 self.log(msg, level=log.OPERATIONAL)
-                return (self.use_servers, self.preexisting_shares)
+                return (self.use_trackers, self.preexisting_shares)
 
-    def _got_response(self, res, server, shares_to_ask, put_server_here):
+    def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
         if isinstance(res, failure.Failure):
             # This is unusual, and probably indicates a bug or a network
             # problem.
-            self.log("%s got error during server selection: %s" % (server, res),
+            self.log("%s got error during server selection: %s" % (tracker, res),
                     level=log.UNUSUAL)
             self.error_count += 1
             self.bad_query_count += 1
             self.homeless_shares |= shares_to_ask
-            if (self.uncontacted_servers
-                or self.contacted_servers
-                or self.contacted_servers2):
+            if (self.first_pass_trackers
+                or self.second_pass_trackers
+                or self.next_pass_trackers):
                 # there is still hope, so just loop
                 pass
             else:
@@ -477,17 +492,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 # failure we got: if a coding error causes all servers to fail
                 # in the same way, this allows the common failure to be seen
                 # by the uploader and should help with debugging
-                msg = ("last failure (from %s) was: %s" % (server, res))
+                msg = ("last failure (from %s) was: %s" % (tracker, res))
                 self.last_failure_msg = msg
         else:
             (alreadygot, allocated) = res
             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
-                    % (idlib.shortnodeid_b2a(server.serverid),
+                    % (tracker.get_name(),
                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
                     level=log.NOISY)
             progress = False
             for s in alreadygot:
-                self.preexisting_shares.setdefault(s, set()).add(server.serverid)
+                self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
                 if s in self.homeless_shares:
                     self.homeless_shares.remove(s)
                     progress = True
@@ -497,11 +512,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             # the ServerTracker will remember which shares were allocated on
             # that peer. We just have to remember to use them.
             if allocated:
-                self.use_servers.add(server)
+                self.use_trackers.add(tracker)
                 progress = True
 
             if allocated or alreadygot:
-                self.servers_with_shares.add(server.serverid)
+                self.serverids_with_shares.add(tracker.get_serverid())
 
             not_yet_present = set(shares_to_ask) - set(alreadygot)
             still_homeless = not_yet_present - set(allocated)
@@ -532,7 +547,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             else:
                 # if they *were* able to accept everything, they might be
                 # willing to accept even more.
-                put_server_here.append(server)
+                put_tracker_here.append(tracker)
 
         # now loop
         return self._loop()
@@ -545,11 +560,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         place shares for this file. I then raise an
         UploadUnhappinessError with my msg argument.
         """
-        for server in self.use_servers:
-            assert isinstance(server, ServerTracker)
-
-            server.abort()
-
+        for tracker in self.use_trackers:
+            assert isinstance(tracker, ServerTracker)
+            tracker.abort()
         raise UploadUnhappinessError(msg)
 
 
@@ -921,38 +934,41 @@ class CHKUploader:
         d.addCallback(_done)
         return d
 
-    def set_shareholders(self, (upload_servers, already_servers), encoder):
+    def set_shareholders(self, (upload_trackers, already_serverids), encoder):
         """
-        @param upload_servers: a sequence of ServerTracker objects that
-                               have agreed to hold some shares for us (the
-                               shareids are stashed inside the ServerTracker)
-        @paran already_servers: a dict mapping sharenum to a set of serverids
-                                that claim to already have this share
+        @param upload_trackers: a sequence of ServerTracker objects that
+                                have agreed to hold some shares for us (the
+                                shareids are stashed inside the ServerTracker)
+
+        @paran already_serverids: a dict mapping sharenum to a set of
+                                  serverids for servers that claim to already
+                                  have this share
         """
-        msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s"
-        values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()])
-            for s in upload_servers], already_servers)
+        msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
+        values = ([', '.join([str_shareloc(k,v)
+                              for k,v in st.buckets.iteritems()])
+                   for st in upload_trackers], already_serverids)
         self.log(msgtempl % values, level=log.OPERATIONAL)
         # record already-present shares in self._results
-        self._results.preexisting_shares = len(already_servers)
+        self._results.preexisting_shares = len(already_serverids)
 
         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
-        for server in upload_servers:
-            assert isinstance(server, ServerTracker)
+        for tracker in upload_trackers:
+            assert isinstance(tracker, ServerTracker)
         buckets = {}
-        servermap = already_servers.copy()
-        for server in upload_servers:
-            buckets.update(server.buckets)
-            for shnum in server.buckets:
-                self._server_trackers[shnum] = server
-                servermap.setdefault(shnum, set()).add(server.serverid)
-        assert len(buckets) == sum([len(server.buckets)
-                                    for server in upload_servers]), \
+        servermap = already_serverids.copy()
+        for tracker in upload_trackers:
+            buckets.update(tracker.buckets)
+            for shnum in tracker.buckets:
+                self._server_trackers[shnum] = tracker
+                servermap.setdefault(shnum, set()).add(tracker.get_serverid())
+        assert len(buckets) == sum([len(tracker.buckets)
+                                    for tracker in upload_trackers]), \
             "%s (%s) != %s (%s)" % (
                 len(buckets),
                 buckets,
-                sum([len(server.buckets) for server in upload_servers]),
-                [(s.buckets, s.serverid) for s in upload_servers]
+                sum([len(tracker.buckets) for tracker in upload_trackers]),
+                [(t.buckets, t.get_serverid()) for t in upload_trackers]
                 )
         encoder.set_shareholders(buckets, servermap)
 
@@ -961,7 +977,7 @@ class CHKUploader:
         r = self._results
         for shnum in self._encoder.get_shares_placed():
             server_tracker = self._server_trackers[shnum]
-            serverid = server_tracker.serverid
+            serverid = server_tracker.get_serverid()
             r.sharemap.add(shnum, serverid)
             r.servermap.add(serverid, shnum)
         r.pushed_shares = len(self._encoder.get_shares_placed())
@@ -1393,9 +1409,10 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
     name = "uploader"
     URI_LIT_SIZE_THRESHOLD = 55
 
-    def __init__(self, helper_furl=None, stats_provider=None):
+    def __init__(self, helper_furl=None, stats_provider=None, history=None):
         self._helper_furl = helper_furl
         self.stats_provider = stats_provider
+        self._history = history
         self._helper = None
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
@@ -1431,7 +1448,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
         return (self._helper_furl, bool(self._helper))
 
 
-    def upload(self, uploadable, history=None):
+    def upload(self, uploadable):
         """
         Returns a Deferred that will fire with the UploadResults instance.
         """
@@ -1467,8 +1484,8 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
                     d2.addCallback(lambda x: uploader.start(eu))
 
                 self._all_uploads[uploader] = None
-                if history:
-                    history.add_upload(uploader.get_upload_status())
+                if self._history:
+                    self._history.add_upload(uploader.get_upload_status())
                 def turn_verifycap_into_read_cap(uploadresults):
                     # Generate the uri from the verifycap plus the key.
                     d3 = uploadable.get_encryption_key()