]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/upload.py
Let Uploader retain History instead of passing it into upload(). Fixes #1079.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
index cf275b899b1c6f4a78f48f01d79cfb76f7c1ce86..013aca97cbc6a9ce2c221531cc9e444f5bfdd33e 100644 (file)
@@ -1,38 +1,33 @@
-
 import os, time, weakref, itertools
 from zope.interface import implements
 from twisted.python import failure
 from twisted.internet import defer
 from twisted.application import service
-from foolscap import Referenceable, Copyable, RemoteCopy
-from foolscap import eventual
-from foolscap.logging import log
+from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
 
 from allmydata.util.hashutil import file_renewal_secret_hash, \
      file_cancel_secret_hash, bucket_renewal_secret_hash, \
      bucket_cancel_secret_hash, plaintext_hasher, \
      storage_index_hash, plaintext_segment_hasher, convergence_hasher
-from allmydata import storage, hashtree, uri
+from allmydata import hashtree, uri
+from allmydata.storage.server import si_b2a
 from allmydata.immutable import encode
-from allmydata.util import base32, idlib, mathutil
+from allmydata.util import base32, dictutil, idlib, log, mathutil
+from allmydata.util.happinessutil import servers_of_happiness, \
+                                         shares_by_server, merge_servers, \
+                                         failure_message
 from allmydata.util.assertutil import precondition
+from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
-     IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
+     IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
+     NoServersError, InsufficientVersionError, UploadUnhappinessError, \
+     DEFAULT_MAX_SEGMENT_SIZE
+from allmydata.immutable import layout
 from pycryptopp.cipher.aes import AES
 
 from cStringIO import StringIO
 
 
-KiB=1024
-MiB=1024*KiB
-GiB=1024*MiB
-TiB=1024*GiB
-PiB=1024*TiB
-
-class HaveAllPeersError(Exception):
-    # we use this to jump out of the loop
-    pass
-
 # this wants to live in storage, not here
 class TooFullError(Exception):
     pass
@@ -45,10 +40,15 @@ class UploadResults(Copyable, RemoteCopy):
     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
     copytype = typeToCopy
 
+    # also, think twice about changing the shape of any existing attribute,
+    # because instances of this class are sent from the helper to its client,
+    # so changing this may break compatibility. Consider adding new fields
+    # instead of modifying existing ones.
+
     def __init__(self):
         self.timings = {} # dict of name to number of seconds
-        self.sharemap = {} # dict of shnum to placement string
-        self.servermap = {} # dict of peerid to set(shnums)
+        self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
+        self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
         self.file_size = None
         self.ciphertext_fetched = None # how much the helper fetched
         self.uri = None
@@ -65,22 +65,24 @@ EXTENSION_SIZE = 1000
 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
 # this.
 
-class PeerTracker:
-    def __init__(self, peerid, storage_server,
+def pretty_print_shnum_to_servers(s):
+    return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
+
+class ServerTracker:
+    def __init__(self, server,
                  sharesize, blocksize, num_segments, num_share_hashes,
                  storage_index,
                  bucket_renewal_secret, bucket_cancel_secret):
-        precondition(isinstance(peerid, str), peerid)
-        precondition(len(peerid) == 20, peerid)
-        self.peerid = peerid
-        self._storageserver = storage_server # to an RIStorageServer
+        self._server = server
         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
         self.sharesize = sharesize
-        self.allocated_size = storage.allocated_size(sharesize,
-                                                     num_segments,
-                                                     num_share_hashes,
-                                                     EXTENSION_SIZE)
 
+        wbp = layout.make_write_bucket_proxy(None, None, sharesize,
+                                             blocksize, num_segments,
+                                             num_share_hashes,
+                                             EXTENSION_SIZE)
+        self.wbp_class = wbp.__class__ # to create more of them
+        self.allocated_size = wbp.get_allocated_size()
         self.blocksize = blocksize
         self.num_segments = num_segments
         self.num_share_hashes = num_share_hashes
@@ -90,79 +92,109 @@ class PeerTracker:
         self.cancel_secret = bucket_cancel_secret
 
     def __repr__(self):
-        return ("<PeerTracker for peer %s and SI %s>"
-                % (idlib.shortnodeid_b2a(self.peerid),
-                   storage.si_b2a(self.storage_index)[:5]))
+        return ("<ServerTracker for server %s and SI %s>"
+                % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
+
+    def get_serverid(self):
+        return self._server.get_serverid()
+    def get_name(self):
+        return self._server.get_name()
 
     def query(self, sharenums):
-        d = self._storageserver.callRemote("allocate_buckets",
-                                           self.storage_index,
-                                           self.renew_secret,
-                                           self.cancel_secret,
-                                           sharenums,
-                                           self.allocated_size,
-                                           canary=Referenceable())
+        rref = self._server.get_rref()
+        d = rref.callRemote("allocate_buckets",
+                            self.storage_index,
+                            self.renew_secret,
+                            self.cancel_secret,
+                            sharenums,
+                            self.allocated_size,
+                            canary=Referenceable())
         d.addCallback(self._got_reply)
         return d
 
+    def ask_about_existing_shares(self):
+        rref = self._server.get_rref()
+        return rref.callRemote("get_buckets", self.storage_index)
+
     def _got_reply(self, (alreadygot, buckets)):
         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
         b = {}
         for sharenum, rref in buckets.iteritems():
-            bp = storage.WriteBucketProxy(rref, self.sharesize,
-                                          self.blocksize,
-                                          self.num_segments,
-                                          self.num_share_hashes,
-                                          EXTENSION_SIZE,
-                                          self.peerid)
+            bp = self.wbp_class(rref, self._server, self.sharesize,
+                                self.blocksize,
+                                self.num_segments,
+                                self.num_share_hashes,
+                                EXTENSION_SIZE)
             b[sharenum] = bp
         self.buckets.update(b)
         return (alreadygot, set(b.keys()))
 
-class Tahoe2PeerSelector:
+
+    def abort(self):
+        """
+        I abort the remote bucket writers for all shares. This is a good idea
+        to conserve space on the storage server.
+        """
+        self.abort_some_buckets(self.buckets.keys())
+
+    def abort_some_buckets(self, sharenums):
+        """
+        I abort the remote bucket writers for the share numbers in sharenums.
+        """
+        for sharenum in sharenums:
+            if sharenum in self.buckets:
+                self.buckets[sharenum].abort()
+                del self.buckets[sharenum]
+
+
+def str_shareloc(shnum, bucketwriter):
+    return "%s: %s" % (shnum, bucketwriter.get_servername(),)
+
+class Tahoe2ServerSelector(log.PrefixingLogMixin):
 
     def __init__(self, upload_id, logparent=None, upload_status=None):
         self.upload_id = upload_id
         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
+        # Servers that are working normally, but full.
+        self.full_count = 0
         self.error_count = 0
-        self.num_peers_contacted = 0
+        self.num_servers_contacted = 0
         self.last_failure_msg = None
         self._status = IUploadStatus(upload_status)
-        self._log_parent = log.msg("%s starting" % self, parent=logparent)
+        log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
+        self.log("starting", level=log.OPERATIONAL)
 
     def __repr__(self):
-        return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
+        return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
 
-    def get_shareholders(self, client,
+    def get_shareholders(self, storage_broker, secret_holder,
                          storage_index, share_size, block_size,
-                         num_segments, total_shares, shares_of_happiness):
+                         num_segments, total_shares, needed_shares,
+                         servers_of_happiness):
         """
-        @return: (used_peers, already_peers), where used_peers is a set of
-                 PeerTracker instances that have agreed to hold some shares
-                 for us (the shnum is stashed inside the PeerTracker),
-                 and already_peers is a dict mapping shnum to a peer
-                 which claims to already have the share.
+        @return: (upload_trackers, already_serverids), where upload_trackers
+                 is a set of ServerTracker instances that have agreed to hold
+                 some shares for us (the shareids are stashed inside the
+                 ServerTracker), and already_serverids is a dict mapping
+                 shnum to a set of serverids for servers which claim to
+                 already have the share.
         """
 
         if self._status:
-            self._status.set_status("Contacting Peers..")
+            self._status.set_status("Contacting Servers..")
 
         self.total_shares = total_shares
-        self.shares_of_happiness = shares_of_happiness
+        self.servers_of_happiness = servers_of_happiness
+        self.needed_shares = needed_shares
 
-        self.homeless_shares = range(total_shares)
-        # self.uncontacted_peers = list() # peers we haven't asked yet
-        self.contacted_peers = [] # peers worth asking again
-        self.contacted_peers2 = [] # peers that we have asked again
-        self._started_second_pass = False
-        self.use_peers = set() # PeerTrackers that have shares assigned to them
-        self.preexisting_shares = {} # sharenum -> peerid holding the share
-
-        peers = client.get_permuted_peers("storage", storage_index)
-        if not peers:
-            raise encode.NotEnoughSharesError("client gave us zero peers")
+        self.homeless_shares = set(range(total_shares))
+        self.use_trackers = set() # ServerTrackers that have shares assigned
+                                  # to them
+        self.preexisting_shares = {} # shareid => set(serverids) holding shareid
 
-        # figure out how much space to ask for
+        # These servers have shares -- any shares -- for our SI. We keep
+        # track of these to write an error message with them later.
+        self.serverids_with_shares = set()
 
         # this needed_hashes computation should mirror
         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
@@ -171,187 +203,369 @@ class Tahoe2PeerSelector:
         ht = hashtree.IncompleteHashTree(total_shares)
         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
 
+        # figure out how much space to ask for
+        wbp = layout.make_write_bucket_proxy(None, None,
+                                             share_size, 0, num_segments,
+                                             num_share_hashes, EXTENSION_SIZE)
+        allocated_size = wbp.get_allocated_size()
+        all_servers = storage_broker.get_servers_for_psi(storage_index)
+        if not all_servers:
+            raise NoServersError("client gave us zero servers")
+
+        # filter the list of servers according to which ones can accomodate
+        # this request. This excludes older servers (which used a 4-byte size
+        # field) from getting large shares (for files larger than about
+        # 12GiB). See #439 for details.
+        def _get_maxsize(server):
+            v0 = server.get_rref().version
+            v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
+            return v1["maximum-immutable-share-size"]
+        writeable_servers = [server for server in all_servers
+                            if _get_maxsize(server) >= allocated_size]
+        readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
+
         # decide upon the renewal/cancel secrets, to include them in the
-        # allocat_buckets query.
-        client_renewal_secret = client.get_renewal_secret()
-        client_cancel_secret = client.get_cancel_secret()
+        # allocate_buckets query.
+        client_renewal_secret = secret_holder.get_renewal_secret()
+        client_cancel_secret = secret_holder.get_cancel_secret()
 
         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
                                                        storage_index)
         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
                                                      storage_index)
+        def _make_trackers(servers):
+            trackers = []
+            for s in servers:
+                seed = s.get_lease_seed()
+                renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
+                cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
+                st = ServerTracker(s,
+                                   share_size, block_size,
+                                   num_segments, num_share_hashes,
+                                   storage_index,
+                                   renew, cancel)
+                trackers.append(st)
+            return trackers
+
+        # We assign each servers/trackers into one three lists. They all
+        # start in the "first pass" list. During the first pass, as we ask
+        # each one to hold a share, we move their tracker to the "second
+        # pass" list, until the first-pass list is empty. Then during the
+        # second pass, as we ask each to hold more shares, we move their
+        # tracker to the "next pass" list, until the second-pass list is
+        # empty. Then we move everybody from the next-pass list back to the
+        # second-pass list and repeat the "second" pass (really the third,
+        # fourth, etc pass), until all shares are assigned, or we've run out
+        # of potential servers.
+        self.first_pass_trackers = _make_trackers(writeable_servers)
+        self.second_pass_trackers = [] # servers worth asking again
+        self.next_pass_trackers = [] # servers that we have asked again
+        self._started_second_pass = False
+
+        # We don't try to allocate shares to these servers, since they've
+        # said that they're incapable of storing shares of the size that we'd
+        # want to store. We ask them about existing shares for this storage
+        # index, which we want to know about for accurate
+        # servers_of_happiness accounting, then we forget about them.
+        readonly_trackers = _make_trackers(readonly_servers)
+
+        # We now ask servers that can't hold any new shares about existing
+        # shares that they might have for our SI. Once this is done, we
+        # start placing the shares that we haven't already accounted
+        # for.
+        ds = []
+        if self._status and readonly_trackers:
+            self._status.set_status("Contacting readonly servers to find "
+                                    "any existing shares")
+        for tracker in readonly_trackers:
+            assert isinstance(tracker, ServerTracker)
+            d = tracker.ask_about_existing_shares()
+            d.addBoth(self._handle_existing_response, tracker)
+            ds.append(d)
+            self.num_servers_contacted += 1
+            self.query_count += 1
+            self.log("asking server %s for any existing shares" %
+                     (tracker.get_name(),), level=log.NOISY)
+        dl = defer.DeferredList(ds)
+        dl.addCallback(lambda ign: self._loop())
+        return dl
+
+
+    def _handle_existing_response(self, res, tracker):
+        """
+        I handle responses to the queries sent by
+        Tahoe2ServerSelector._existing_shares.
+        """
+        serverid = tracker.get_serverid()
+        if isinstance(res, failure.Failure):
+            self.log("%s got error during existing shares check: %s"
+                    % (tracker.get_name(), res), level=log.UNUSUAL)
+            self.error_count += 1
+            self.bad_query_count += 1
+        else:
+            buckets = res
+            if buckets:
+                self.serverids_with_shares.add(serverid)
+            self.log("response to get_buckets() from server %s: alreadygot=%s"
+                    % (tracker.get_name(), tuple(sorted(buckets))),
+                    level=log.NOISY)
+            for bucket in buckets:
+                self.preexisting_shares.setdefault(bucket, set()).add(serverid)
+                self.homeless_shares.discard(bucket)
+            self.full_count += 1
+            self.bad_query_count += 1
+
+
+    def _get_progress_message(self):
+        if not self.homeless_shares:
+            msg = "placed all %d shares, " % (self.total_shares)
+        else:
+            msg = ("placed %d shares out of %d total (%d homeless), " %
+                   (self.total_shares - len(self.homeless_shares),
+                    self.total_shares,
+                    len(self.homeless_shares)))
+        return (msg + "want to place shares on at least %d servers such that "
+                      "any %d of them have enough shares to recover the file, "
+                      "sent %d queries to %d servers, "
+                      "%d queries placed some shares, %d placed none "
+                      "(of which %d placed none due to the server being"
+                      " full and %d placed none due to an error)" %
+                        (self.servers_of_happiness, self.needed_shares,
+                         self.query_count, self.num_servers_contacted,
+                         self.good_query_count, self.bad_query_count,
+                         self.full_count, self.error_count))
 
-        trackers = [ PeerTracker(peerid, conn,
-                                 share_size, block_size,
-                                 num_segments, num_share_hashes,
-                                 storage_index,
-                                 bucket_renewal_secret_hash(file_renewal_secret,
-                                                            peerid),
-                                 bucket_cancel_secret_hash(file_cancel_secret,
-                                                           peerid),
-                                 )
-                     for (peerid, conn) in peers ]
-        self.uncontacted_peers = trackers
-
-        d = defer.maybeDeferred(self._loop)
-        return d
 
     def _loop(self):
         if not self.homeless_shares:
-            # all done
-            msg = ("placed all %d shares, "
-                   "sent %d queries to %d peers, "
-                   "%d queries placed some shares, %d placed none, "
-                   "got %d errors" %
-                   (self.total_shares,
-                    self.query_count, self.num_peers_contacted,
-                    self.good_query_count, self.bad_query_count,
-                    self.error_count))
-            log.msg("peer selection successful for %s: %s" % (self, msg),
-                    parent=self._log_parent)
-            return (self.use_peers, self.preexisting_shares)
-
-        if self.uncontacted_peers:
-            peer = self.uncontacted_peers.pop(0)
-            # TODO: don't pre-convert all peerids to PeerTrackers
-            assert isinstance(peer, PeerTracker)
-
-            shares_to_ask = set([self.homeless_shares.pop(0)])
+            merged = merge_servers(self.preexisting_shares, self.use_trackers)
+            effective_happiness = servers_of_happiness(merged)
+            if self.servers_of_happiness <= effective_happiness:
+                msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
+                       "self.use_trackers: %s, self.preexisting_shares: %s") \
+                       % (self, self._get_progress_message(),
+                          pretty_print_shnum_to_servers(merged),
+                          [', '.join([str_shareloc(k,v)
+                                      for k,v in st.buckets.iteritems()])
+                           for st in self.use_trackers],
+                          pretty_print_shnum_to_servers(self.preexisting_shares))
+                self.log(msg, level=log.OPERATIONAL)
+                return (self.use_trackers, self.preexisting_shares)
+            else:
+                # We're not okay right now, but maybe we can fix it by
+                # redistributing some shares. In cases where one or two
+                # servers has, before the upload, all or most of the
+                # shares for a given SI, this can work by allowing _loop
+                # a chance to spread those out over the other servers,
+                delta = self.servers_of_happiness - effective_happiness
+                shares = shares_by_server(self.preexisting_shares)
+                # Each server in shares maps to a set of shares stored on it.
+                # Since we want to keep at least one share on each server
+                # that has one (otherwise we'd only be making
+                # the situation worse by removing distinct servers),
+                # each server has len(its shares) - 1 to spread around.
+                shares_to_spread = sum([len(list(sharelist)) - 1
+                                        for (server, sharelist)
+                                        in shares.items()])
+                if delta <= len(self.first_pass_trackers) and \
+                   shares_to_spread >= delta:
+                    items = shares.items()
+                    while len(self.homeless_shares) < delta:
+                        # Loop through the allocated shares, removing
+                        # one from each server that has more than one
+                        # and putting it back into self.homeless_shares
+                        # until we've done this delta times.
+                        server, sharelist = items.pop()
+                        if len(sharelist) > 1:
+                            share = sharelist.pop()
+                            self.homeless_shares.add(share)
+                            self.preexisting_shares[share].remove(server)
+                            if not self.preexisting_shares[share]:
+                                del self.preexisting_shares[share]
+                            items.append((server, sharelist))
+                        for writer in self.use_trackers:
+                            writer.abort_some_buckets(self.homeless_shares)
+                    return self._loop()
+                else:
+                    # Redistribution won't help us; fail.
+                    server_count = len(self.serverids_with_shares)
+                    failmsg = failure_message(server_count,
+                                              self.needed_shares,
+                                              self.servers_of_happiness,
+                                              effective_happiness)
+                    servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
+                    servmsg = servmsgtempl % (
+                        self,
+                        failmsg,
+                        self._get_progress_message(),
+                        pretty_print_shnum_to_servers(merged)
+                        )
+                    self.log(servmsg, level=log.INFREQUENT)
+                    return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
+
+        if self.first_pass_trackers:
+            tracker = self.first_pass_trackers.pop(0)
+            # TODO: don't pre-convert all serverids to ServerTrackers
+            assert isinstance(tracker, ServerTracker)
+
+            shares_to_ask = set(sorted(self.homeless_shares)[:1])
+            self.homeless_shares -= shares_to_ask
             self.query_count += 1
-            self.num_peers_contacted += 1
+            self.num_servers_contacted += 1
             if self._status:
-                self._status.set_status("Contacting Peers [%s] (first query),"
+                self._status.set_status("Contacting Servers [%s] (first query),"
                                         " %d shares left.."
-                                        % (idlib.shortnodeid_b2a(peer.peerid),
+                                        % (tracker.get_name(),
                                            len(self.homeless_shares)))
-            d = peer.query(shares_to_ask)
-            d.addBoth(self._got_response, peer, shares_to_ask,
-                      self.contacted_peers)
+            d = tracker.query(shares_to_ask)
+            d.addBoth(self._got_response, tracker, shares_to_ask,
+                      self.second_pass_trackers)
             return d
-        elif self.contacted_peers:
-            # ask a peer that we've already asked.
+        elif self.second_pass_trackers:
+            # ask a server that we've already asked.
             if not self._started_second_pass:
-                log.msg("starting second pass", parent=self._log_parent,
+                self.log("starting second pass",
                         level=log.NOISY)
                 self._started_second_pass = True
             num_shares = mathutil.div_ceil(len(self.homeless_shares),
-                                           len(self.contacted_peers))
-            peer = self.contacted_peers.pop(0)
-            shares_to_ask = set(self.homeless_shares[:num_shares])
-            self.homeless_shares[:num_shares] = []
+                                           len(self.second_pass_trackers))
+            tracker = self.second_pass_trackers.pop(0)
+            shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
+            self.homeless_shares -= shares_to_ask
             self.query_count += 1
             if self._status:
-                self._status.set_status("Contacting Peers [%s] (second query),"
+                self._status.set_status("Contacting Servers [%s] (second query),"
                                         " %d shares left.."
-                                        % (idlib.shortnodeid_b2a(peer.peerid),
+                                        % (tracker.get_name(),
                                            len(self.homeless_shares)))
-            d = peer.query(shares_to_ask)
-            d.addBoth(self._got_response, peer, shares_to_ask,
-                      self.contacted_peers2)
+            d = tracker.query(shares_to_ask)
+            d.addBoth(self._got_response, tracker, shares_to_ask,
+                      self.next_pass_trackers)
             return d
-        elif self.contacted_peers2:
+        elif self.next_pass_trackers:
             # we've finished the second-or-later pass. Move all the remaining
-            # peers back into self.contacted_peers for the next pass.
-            self.contacted_peers.extend(self.contacted_peers2)
-            self.contacted_peers[:] = []
+            # servers back into self.second_pass_trackers for the next pass.
+            self.second_pass_trackers.extend(self.next_pass_trackers)
+            self.next_pass_trackers[:] = []
             return self._loop()
         else:
-            # no more peers. If we haven't placed enough shares, we fail.
-            placed_shares = self.total_shares - len(self.homeless_shares)
-            if placed_shares < self.shares_of_happiness:
-                msg = ("placed %d shares out of %d total (%d homeless), "
-                       "sent %d queries to %d peers, "
-                       "%d queries placed some shares, %d placed none, "
-                       "got %d errors" %
-                       (self.total_shares - len(self.homeless_shares),
-                        self.total_shares, len(self.homeless_shares),
-                        self.query_count, self.num_peers_contacted,
-                        self.good_query_count, self.bad_query_count,
-                        self.error_count))
-                msg = "peer selection failed for %s: %s" % (self, msg)
+            # no more servers. If we haven't placed enough shares, we fail.
+            merged = merge_servers(self.preexisting_shares, self.use_trackers)
+            effective_happiness = servers_of_happiness(merged)
+            if effective_happiness < self.servers_of_happiness:
+                msg = failure_message(len(self.serverids_with_shares),
+                                      self.needed_shares,
+                                      self.servers_of_happiness,
+                                      effective_happiness)
+                msg = ("server selection failed for %s: %s (%s)" %
+                       (self, msg, self._get_progress_message()))
                 if self.last_failure_msg:
                     msg += " (%s)" % (self.last_failure_msg,)
-                log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
-                raise encode.NotEnoughSharesError(msg)
+                self.log(msg, level=log.UNUSUAL)
+                return self._failed(msg)
             else:
                 # we placed enough to be happy, so we're done
                 if self._status:
                     self._status.set_status("Placed all shares")
-                return self.use_peers
+                msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
+                            self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
+                self.log(msg, level=log.OPERATIONAL)
+                return (self.use_trackers, self.preexisting_shares)
 
-    def _got_response(self, res, peer, shares_to_ask, put_peer_here):
+    def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
         if isinstance(res, failure.Failure):
             # This is unusual, and probably indicates a bug or a network
             # problem.
-            log.msg("%s got error during peer selection: %s" % (peer, res),
-                    level=log.UNUSUAL, parent=self._log_parent)
+            self.log("%s got error during server selection: %s" % (tracker, res),
+                    level=log.UNUSUAL)
             self.error_count += 1
-            self.homeless_shares = list(shares_to_ask) + self.homeless_shares
-            if (self.uncontacted_peers
-                or self.contacted_peers
-                or self.contacted_peers2):
+            self.bad_query_count += 1
+            self.homeless_shares |= shares_to_ask
+            if (self.first_pass_trackers
+                or self.second_pass_trackers
+                or self.next_pass_trackers):
                 # there is still hope, so just loop
                 pass
             else:
-                # No more peers, so this upload might fail (it depends upon
-                # whether we've hit shares_of_happiness or not). Log the last
-                # failure we got: if a coding error causes all peers to fail
+                # No more servers, so this upload might fail (it depends upon
+                # whether we've hit servers_of_happiness or not). Log the last
+                # failure we got: if a coding error causes all servers to fail
                 # in the same way, this allows the common failure to be seen
                 # by the uploader and should help with debugging
-                msg = ("last failure (from %s) was: %s" % (peer, res))
+                msg = ("last failure (from %s) was: %s" % (tracker, res))
                 self.last_failure_msg = msg
         else:
             (alreadygot, allocated) = res
-            log.msg("response from peer %s: alreadygot=%s, allocated=%s"
-                    % (idlib.shortnodeid_b2a(peer.peerid),
+            self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
+                    % (tracker.get_name(),
                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
-                    level=log.NOISY, parent=self._log_parent)
+                    level=log.NOISY)
             progress = False
             for s in alreadygot:
-                self.preexisting_shares[s] = peer.peerid
+                self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
                 if s in self.homeless_shares:
                     self.homeless_shares.remove(s)
                     progress = True
+                elif s in shares_to_ask:
+                    progress = True
 
-            # the PeerTracker will remember which shares were allocated on
+            # the ServerTracker will remember which shares were allocated on
             # that peer. We just have to remember to use them.
             if allocated:
-                self.use_peers.add(peer)
+                self.use_trackers.add(tracker)
                 progress = True
 
+            if allocated or alreadygot:
+                self.serverids_with_shares.add(tracker.get_serverid())
+
             not_yet_present = set(shares_to_ask) - set(alreadygot)
             still_homeless = not_yet_present - set(allocated)
 
             if progress:
-                # they accepted or already had at least one share, so
-                # progress has been made
+                # They accepted at least one of the shares that we asked
+                # them to accept, or they had a share that we didn't ask
+                # them to accept but that we hadn't placed yet, so this
+                # was a productive query
                 self.good_query_count += 1
             else:
                 self.bad_query_count += 1
+                self.full_count += 1
 
             if still_homeless:
                 # In networks with lots of space, this is very unusual and
-                # probably indicates an error. In networks with peers that
+                # probably indicates an error. In networks with servers that
                 # are full, it is merely unusual. In networks that are very
                 # full, it is common, and many uploads will fail. In most
                 # cases, this is obviously not fatal, and we'll just use some
-                # other peers.
+                # other servers.
 
                 # some shares are still homeless, keep trying to find them a
                 # home. The ones that were rejected get first priority.
-                self.homeless_shares = (list(still_homeless)
-                                        + self.homeless_shares)
+                self.homeless_shares |= still_homeless
                 # Since they were unable to accept all of our requests, so it
                 # is safe to assume that asking them again won't help.
             else:
                 # if they *were* able to accept everything, they might be
                 # willing to accept even more.
-                put_peer_here.append(peer)
+                put_tracker_here.append(tracker)
 
         # now loop
         return self._loop()
 
 
+    def _failed(self, msg):
+        """
+        I am called when server selection fails. I first abort all of the
+        remote buckets that I allocated during my unsuccessful attempt to
+        place shares for this file. I then raise an
+        UploadUnhappinessError with my msg argument.
+        """
+        for tracker in self.use_trackers:
+            assert isinstance(tracker, ServerTracker)
+            tracker.abort()
+        raise UploadUnhappinessError(msg)
+
+
 class EncryptAnUploadable:
     """This is a wrapper that takes an IUploadable and provides
     IEncryptedUploadable."""
@@ -502,7 +716,7 @@ class EncryptAnUploadable:
         # actually synchronous too, we'd blow the stack unless we stall for a
         # tick. Once you accept a Deferred from IUploadable.read(), you must
         # be prepared to have it fire immediately too.
-        d.addCallback(eventual.fireEventually)
+        d.addCallback(fireEventually)
         def _good(plaintext):
             # and encrypt it..
             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
@@ -550,6 +764,7 @@ class EncryptAnUploadable:
 
 
     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
+        # this is currently unused, but will live again when we fix #453
         if len(self._plaintext_segment_hashes) < num_segments:
             # close out the last one
             assert len(self._plaintext_segment_hashes) == num_segments-1
@@ -624,11 +839,13 @@ class UploadStatus:
         self.results = value
 
 class CHKUploader:
-    peer_selector_class = Tahoe2PeerSelector
+    server_selector_class = Tahoe2ServerSelector
 
-    def __init__(self, client):
-        self._client = client
-        self._log_number = self._client.log("CHKUploader starting")
+    def __init__(self, storage_broker, secret_holder):
+        # server_selector needs storage_broker and secret_holder
+        self._storage_broker = storage_broker
+        self._secret_holder = secret_holder
+        self._log_number = self.log("CHKUploader starting", parent=None)
         self._encoder = None
         self._results = UploadResults()
         self._storage_index = None
@@ -637,39 +854,36 @@ class CHKUploader:
         self._upload_status.set_active(True)
         self._upload_status.set_results(self._results)
 
+        # locate_all_shareholders() will create the following attribute:
+        # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
+
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
             kwargs["parent"] = self._log_number
         if "facility" not in kwargs:
             kwargs["facility"] = "tahoe.upload"
-        return self._client.log(*args, **kwargs)
+        return log.msg(*args, **kwargs)
 
-    def start(self, uploadable):
+    def start(self, encrypted_uploadable):
         """Start uploading the file.
 
-        This method returns a Deferred that will fire with the URI (a
-        string)."""
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
 
         self._started = time.time()
-        uploadable = IUploadable(uploadable)
-        self.log("starting upload of %s" % uploadable)
+        eu = IEncryptedUploadable(encrypted_uploadable)
+        self.log("starting upload of %s" % eu)
 
-        eu = EncryptAnUploadable(uploadable, self._log_number)
         eu.set_upload_status(self._upload_status)
         d = self.start_encrypted(eu)
-        def _uploaded(res):
-            d1 = uploadable.get_encryption_key()
-            d1.addCallback(lambda key: self._compute_uri(res, key))
-            return d1
-        d.addCallback(_uploaded)
-        def _done(res):
+        def _done(uploadresults):
             self._upload_status.set_active(False)
-            return res
+            return uploadresults
         d.addBoth(_done)
         return d
 
     def abort(self):
-        """Call this is the upload must be abandoned before it completes.
+        """Call this if the upload must be abandoned before it completes.
         This will tell the shareholders to delete their partial shares. I
         return a Deferred that fires when these messages have been acked."""
         if not self._encoder:
@@ -678,6 +892,7 @@ class CHKUploader:
         return self._encoder.abort()
 
     def start_encrypted(self, encrypted):
+        """ Returns a Deferred that will fire with the UploadResults instance. """
         eu = IEncryptedUploadable(encrypted)
 
         started = time.time()
@@ -688,92 +903,92 @@ class CHKUploader:
         d.addCallback(self.set_shareholders, e)
         d.addCallback(lambda res: e.start())
         d.addCallback(self._encrypted_done)
-        # this fires with the uri_extension_hash and other data
         return d
 
     def locate_all_shareholders(self, encoder, started):
-        peer_selection_started = now = time.time()
+        server_selection_started = now = time.time()
         self._storage_index_elapsed = now - started
+        storage_broker = self._storage_broker
+        secret_holder = self._secret_holder
         storage_index = encoder.get_param("storage_index")
         self._storage_index = storage_index
-        upload_id = storage.si_b2a(storage_index)[:5]
+        upload_id = si_b2a(storage_index)[:5]
         self.log("using storage index %s" % upload_id)
-        peer_selector = self.peer_selector_class(upload_id, self._log_number,
-                                                 self._upload_status)
+        server_selector = self.server_selector_class(upload_id,
+                                                     self._log_number,
+                                                     self._upload_status)
 
         share_size = encoder.get_param("share_size")
         block_size = encoder.get_param("block_size")
         num_segments = encoder.get_param("num_segments")
         k,desired,n = encoder.get_param("share_counts")
 
-        self._peer_selection_started = time.time()
-        d = peer_selector.get_shareholders(self._client, storage_index,
-                                           share_size, block_size,
-                                           num_segments, n, desired)
+        self._server_selection_started = time.time()
+        d = server_selector.get_shareholders(storage_broker, secret_holder,
+                                             storage_index,
+                                             share_size, block_size,
+                                             num_segments, n, k, desired)
         def _done(res):
-            self._peer_selection_elapsed = time.time() - peer_selection_started
+            self._server_selection_elapsed = time.time() - server_selection_started
             return res
         d.addCallback(_done)
         return d
 
-    def set_shareholders(self, (used_peers, already_peers), encoder):
+    def set_shareholders(self, (upload_trackers, already_serverids), encoder):
         """
-        @param used_peers: a sequence of PeerTracker objects
-        @paran already_peers: a dict mapping sharenum to a peerid that
-                              claims to already have this share
+        @param upload_trackers: a sequence of ServerTracker objects that
+                                have agreed to hold some shares for us (the
+                                shareids are stashed inside the ServerTracker)
+
+        @paran already_serverids: a dict mapping sharenum to a set of
+                                  serverids for servers that claim to already
+                                  have this share
         """
-        self.log("_send_shares, used_peers is %s" % (used_peers,))
+        msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
+        values = ([', '.join([str_shareloc(k,v)
+                              for k,v in st.buckets.iteritems()])
+                   for st in upload_trackers], already_serverids)
+        self.log(msgtempl % values, level=log.OPERATIONAL)
         # record already-present shares in self._results
-        for (shnum, peerid) in already_peers.items():
-            peerid_s = idlib.shortnodeid_b2a(peerid)
-            self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
-            if peerid not in self._results.servermap:
-                self._results.servermap[peerid] = set()
-            self._results.servermap[peerid].add(shnum)
-        self._results.preexisting_shares = len(already_peers)
-
-        self._sharemap = {}
-        for peer in used_peers:
-            assert isinstance(peer, PeerTracker)
+        self._results.preexisting_shares = len(already_serverids)
+
+        self._server_trackers = {} # k: shnum, v: instance of ServerTracker
+        for tracker in upload_trackers:
+            assert isinstance(tracker, ServerTracker)
         buckets = {}
-        for peer in used_peers:
-            buckets.update(peer.buckets)
-            for shnum in peer.buckets:
-                self._sharemap[shnum] = peer
-        assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
-        encoder.set_shareholders(buckets)
-
-    def _encrypted_done(self, res):
+        servermap = already_serverids.copy()
+        for tracker in upload_trackers:
+            buckets.update(tracker.buckets)
+            for shnum in tracker.buckets:
+                self._server_trackers[shnum] = tracker
+                servermap.setdefault(shnum, set()).add(tracker.get_serverid())
+        assert len(buckets) == sum([len(tracker.buckets)
+                                    for tracker in upload_trackers]), \
+            "%s (%s) != %s (%s)" % (
+                len(buckets),
+                buckets,
+                sum([len(tracker.buckets) for tracker in upload_trackers]),
+                [(t.buckets, t.get_serverid()) for t in upload_trackers]
+                )
+        encoder.set_shareholders(buckets, servermap)
+
+    def _encrypted_done(self, verifycap):
+        """ Returns a Deferred that will fire with the UploadResults instance. """
         r = self._results
         for shnum in self._encoder.get_shares_placed():
-            peer_tracker = self._sharemap[shnum]
-            peerid = peer_tracker.peerid
-            peerid_s = idlib.shortnodeid_b2a(peerid)
-            r.sharemap[shnum] = "Placed on [%s]" % peerid_s
-            if peerid not in r.servermap:
-                r.servermap[peerid] = set()
-            r.servermap[peerid].add(shnum)
+            server_tracker = self._server_trackers[shnum]
+            serverid = server_tracker.get_serverid()
+            r.sharemap.add(shnum, serverid)
+            r.servermap.add(serverid, shnum)
         r.pushed_shares = len(self._encoder.get_shares_placed())
         now = time.time()
         r.file_size = self._encoder.file_size
         r.timings["total"] = now - self._started
         r.timings["storage_index"] = self._storage_index_elapsed
-        r.timings["peer_selection"] = self._peer_selection_elapsed
+        r.timings["peer_selection"] = self._server_selection_elapsed
         r.timings.update(self._encoder.get_times())
         r.uri_extension_data = self._encoder.get_uri_extension_data()
-        return res
-
-    def _compute_uri(self, (uri_extension_hash,
-                            needed_shares, total_shares, size),
-                     key):
-        u = uri.CHKFileURI(key=key,
-                           uri_extension_hash=uri_extension_hash,
-                           needed_shares=needed_shares,
-                           total_shares=total_shares,
-                           size=size,
-                           )
-        r = self._results
-        r.uri = u.to_string()
+        r.verifycapstr = verifycap.to_string()
         return r
 
     def get_upload_status(self):
@@ -798,8 +1013,7 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]):
 
 class LiteralUploader:
 
-    def __init__(self, client):
-        self._client = client
+    def __init__(self):
         self._results = UploadResults()
         self._status = s = UploadStatus()
         s.set_storage_index(None)
@@ -824,7 +1038,7 @@ class LiteralUploader:
 
     def _build_results(self, uri):
         self._results.uri = uri
-        self._status.set_status("Done")
+        self._status.set_status("Finished")
         self._status.set_progress(1, 1.0)
         self._status.set_progress(2, 1.0)
         return self._results
@@ -902,15 +1116,6 @@ class RemoteEncryptedUploadable(Referenceable):
         d.addCallback(_read)
         return d
 
-    def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
-        log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
-                (first, last-1, num_segments),
-                level=log.NOISY)
-        d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
-        d.addCallback(list)
-        return d
-    def remote_get_plaintext_hash(self):
-        return self._eu.get_plaintext_hash()
     def remote_close(self):
         return self._eu.close()
 
@@ -930,26 +1135,23 @@ class AssistedUploader:
             kwargs["parent"] = self._log_number
         return log.msg(*args, **kwargs)
 
-    def start(self, uploadable):
+    def start(self, encrypted_uploadable, storage_index):
+        """Start uploading the file.
+
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
+        precondition(isinstance(storage_index, str), storage_index)
         self._started = time.time()
-        u = IUploadable(uploadable)
-        eu = EncryptAnUploadable(u, self._log_number)
+        eu = IEncryptedUploadable(encrypted_uploadable)
         eu.set_upload_status(self._upload_status)
         self._encuploadable = eu
+        self._storage_index = storage_index
         d = eu.get_size()
         d.addCallback(self._got_size)
         d.addCallback(lambda res: eu.get_all_encoding_parameters())
         d.addCallback(self._got_all_encoding_parameters)
-        # when we get the encryption key, that will also compute the storage
-        # index, so this only takes one pass.
-        # TODO: I'm not sure it's cool to switch back and forth between
-        # the Uploadable and the IEncryptedUploadable that wraps it.
-        d.addCallback(lambda res: u.get_encryption_key())
-        d.addCallback(self._got_encryption_key)
-        d.addCallback(lambda res: eu.get_storage_index())
-        d.addCallback(self._got_storage_index)
         d.addCallback(self._contact_helper)
-        d.addCallback(self._build_readcap)
+        d.addCallback(self._build_verifycap)
         def _done(res):
             self._upload_status.set_active(False)
             return res
@@ -967,18 +1169,11 @@ class AssistedUploader:
         self._total_shares = n
         self._segment_size = segment_size
 
-    def _got_encryption_key(self, key):
-        self._key = key
-
-    def _got_storage_index(self, storage_index):
-        self._storage_index = storage_index
-
-
     def _contact_helper(self, res):
         now = self._time_contacting_helper_start = time.time()
         self._storage_index_elapsed = now - self._started
         self.log(format="contacting helper for SI %(si)s..",
-                 si=storage.si_b2a(self._storage_index))
+                 si=si_b2a(self._storage_index), level=log.NOISY)
         self._upload_status.set_status("Contacting Helper")
         d = self._helper.callRemote("upload_chk", self._storage_index)
         d.addCallback(self._contacted_helper)
@@ -989,7 +1184,7 @@ class AssistedUploader:
         elapsed = now - self._time_contacting_helper_start
         self._elapsed_time_contacting_helper = elapsed
         if upload_helper:
-            self.log("helper says we need to upload")
+            self.log("helper says we need to upload", level=log.NOISY)
             self._upload_status.set_status("Uploading Ciphertext")
             # we need to upload the file
             reu = RemoteEncryptedUploadable(self._encuploadable,
@@ -1000,26 +1195,42 @@ class AssistedUploader:
                           upload_helper.callRemote("upload", reu))
             # this Deferred will fire with the upload results
             return d
-        self.log("helper says file is already uploaded")
+        self.log("helper says file is already uploaded", level=log.OPERATIONAL)
         self._upload_status.set_progress(1, 1.0)
         self._upload_status.set_results(upload_results)
         return upload_results
 
-    def _build_readcap(self, upload_results):
-        self.log("upload finished, building readcap")
+    def _convert_old_upload_results(self, upload_results):
+        # pre-1.3.0 helpers return upload results which contain a mapping
+        # from shnum to a single human-readable string, containing things
+        # like "Found on [x],[y],[z]" (for healthy files that were already in
+        # the grid), "Found on [x]" (for files that needed upload but which
+        # discovered pre-existing shares), and "Placed on [x]" (for newly
+        # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
+        # set of binary serverid strings.
+
+        # the old results are too hard to deal with (they don't even contain
+        # as much information as the new results, since the nodeids are
+        # abbreviated), so if we detect old results, just clobber them.
+
+        sharemap = upload_results.sharemap
+        if str in [type(v) for v in sharemap.values()]:
+            upload_results.sharemap = None
+
+    def _build_verifycap(self, upload_results):
+        self.log("upload finished, building readcap", level=log.OPERATIONAL)
+        self._convert_old_upload_results(upload_results)
         self._upload_status.set_status("Building Readcap")
         r = upload_results
         assert r.uri_extension_data["needed_shares"] == self._needed_shares
         assert r.uri_extension_data["total_shares"] == self._total_shares
         assert r.uri_extension_data["segment_size"] == self._segment_size
         assert r.uri_extension_data["size"] == self._size
-        u = uri.CHKFileURI(key=self._key,
-                           uri_extension_hash=r.uri_extension_hash,
-                           needed_shares=self._needed_shares,
-                           total_shares=self._total_shares,
-                           size=self._size,
-                           )
-        r.uri = u.to_string()
+        r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
+                                             uri_extension_hash=r.uri_extension_hash,
+                                             needed_shares=self._needed_shares,
+                                             total_shares=self._total_shares, size=self._size
+                                             ).to_string()
         now = time.time()
         r.file_size = self._size
         r.timings["storage_index"] = self._storage_index_elapsed
@@ -1027,7 +1238,7 @@ class AssistedUploader:
         if "total" in r.timings:
             r.timings["helper_total"] = r.timings["total"]
         r.timings["total"] = now - self._started
-        self._upload_status.set_status("Done")
+        self._upload_status.set_status("Finished")
         self._upload_status.set_results(r)
         return r
 
@@ -1035,7 +1246,8 @@ class AssistedUploader:
         return self._upload_status
 
 class BaseUploadable:
-    default_max_segment_size = 128*KiB # overridden by max_segment_size
+    # this is overridden by max_segment_size
+    default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
     default_encoding_param_k = 3 # overridden by encoding_parameters
     default_encoding_param_happy = 7
     default_encoding_param_n = 10
@@ -1189,23 +1401,21 @@ class Data(FileHandle):
         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
         FileHandle.__init__(self, StringIO(data), convergence=convergence)
 
-class Uploader(service.MultiService):
+class Uploader(service.MultiService, log.PrefixingLogMixin):
     """I am a service that allows file uploading. I am a service-child of the
     Client.
     """
     implements(IUploader)
     name = "uploader"
-    uploader_class = CHKUploader
     URI_LIT_SIZE_THRESHOLD = 55
-    MAX_UPLOAD_STATUSES = 10
 
-    def __init__(self, helper_furl=None, stats_provider=None):
+    def __init__(self, helper_furl=None, stats_provider=None, history=None):
         self._helper_furl = helper_furl
         self.stats_provider = stats_provider
+        self._history = history
         self._helper = None
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
-        self._all_upload_statuses = weakref.WeakKeyDictionary()
-        self._recent_upload_statuses = []
+        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
         service.MultiService.__init__(self)
 
     def startService(self):
@@ -1215,8 +1425,21 @@ class Uploader(service.MultiService):
                                       self._got_helper)
 
     def _got_helper(self, helper):
+        self.log("got helper connection, getting versions")
+        default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
+                    { },
+                    "application-version": "unknown: no get_version()",
+                    }
+        d = add_version_to_remote_reference(helper, default)
+        d.addCallback(self._got_versioned_helper)
+
+    def _got_versioned_helper(self, helper):
+        needed = "http://allmydata.org/tahoe/protocols/helper/v1"
+        if needed not in helper.version:
+            raise InsufficientVersionError(needed, helper.version)
         self._helper = helper
         helper.notifyOnDisconnect(self._lost_helper)
+
     def _lost_helper(self):
         self._helper = None
 
@@ -1224,8 +1447,11 @@ class Uploader(service.MultiService):
         # return a tuple of (helper_furl_or_None, connected_bool)
         return (self._helper_furl, bool(self._helper))
 
+
     def upload(self, uploadable):
-        # this returns the URI
+        """
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
         assert self.parent
         assert self.running
 
@@ -1242,28 +1468,39 @@ class Uploader(service.MultiService):
                 self.stats_provider.count('uploader.bytes_uploaded', size)
 
             if size <= self.URI_LIT_SIZE_THRESHOLD:
-                uploader = LiteralUploader(self.parent)
-            elif self._helper:
-                uploader = AssistedUploader(self._helper)
+                uploader = LiteralUploader()
+                return uploader.start(uploadable)
             else:
-                uploader = self.uploader_class(self.parent)
-            self._add_upload(uploader)
-            return uploader.start(uploadable)
+                eu = EncryptAnUploadable(uploadable, self._parentmsgid)
+                d2 = defer.succeed(None)
+                if self._helper:
+                    uploader = AssistedUploader(self._helper)
+                    d2.addCallback(lambda x: eu.get_storage_index())
+                    d2.addCallback(lambda si: uploader.start(eu, si))
+                else:
+                    storage_broker = self.parent.get_storage_broker()
+                    secret_holder = self.parent._secret_holder
+                    uploader = CHKUploader(storage_broker, secret_holder)
+                    d2.addCallback(lambda x: uploader.start(eu))
+
+                self._all_uploads[uploader] = None
+                if self._history:
+                    self._history.add_upload(uploader.get_upload_status())
+                def turn_verifycap_into_read_cap(uploadresults):
+                    # Generate the uri from the verifycap plus the key.
+                    d3 = uploadable.get_encryption_key()
+                    def put_readcap_into_results(key):
+                        v = uri.from_string(uploadresults.verifycapstr)
+                        r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
+                        uploadresults.uri = r.to_string()
+                        return uploadresults
+                    d3.addCallback(put_readcap_into_results)
+                    return d3
+                d2.addCallback(turn_verifycap_into_read_cap)
+                return d2
         d.addCallback(_got_size)
         def _done(res):
             uploadable.close()
             return res
         d.addBoth(_done)
         return d
-
-    def _add_upload(self, uploader):
-        s = uploader.get_upload_status()
-        self._all_uploads[uploader] = None
-        self._all_upload_statuses[s] = None
-        self._recent_upload_statuses.append(s)
-        while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
-            self._recent_upload_statuses.pop(0)
-
-    def list_all_upload_statuses(self):
-        for us in self._all_upload_statuses:
-            yield us