From f4a718c5b6684c2031832107fdc047a447fff610 Mon Sep 17 00:00:00 2001
From: Zooko O'Whielacronx <zooko@zooko.com>
Date: Fri, 30 Mar 2007 10:52:19 -0700
Subject: [PATCH] finish storage server and write new download

---
 src/allmydata/bucketstore.py      | 149 -------------------
 src/allmydata/codec.py            |  10 +-
 src/allmydata/debugshell.py       |  16 ---
 src/allmydata/download.py         | 228 +++++++++++++++++++++---------
 src/allmydata/interfaces.py       |  14 +-
 src/allmydata/storageserver.py    |  40 +++---
 src/allmydata/test/test_upload.py | 218 ++--------------------------
 src/allmydata/upload.py           |  12 +-
 src/allmydata/uri.py              |  13 +-
 9 files changed, 228 insertions(+), 472 deletions(-)
 delete mode 100644 src/allmydata/bucketstore.py

diff --git a/src/allmydata/bucketstore.py b/src/allmydata/bucketstore.py
deleted file mode 100644
index 6c055e78..00000000
--- a/src/allmydata/bucketstore.py
+++ /dev/null
@@ -1,149 +0,0 @@
-import os
-
-from foolscap import Referenceable
-from twisted.application import service
-#from twisted.python import log
-from allmydata.util import idlib
-from zope.interface import implements
-from allmydata.interfaces import RIBucketWriter, RIBucketReader
-
-from allmydata.util.assertutil import precondition, _assert
-
-class BucketStore(service.MultiService, Referenceable):
-    def __init__(self, store_dir):
-        precondition(os.path.isdir(store_dir))
-        service.MultiService.__init__(self)
-        self._store_dir = store_dir
-
-        self._leases = set() # should do weakref dances.
-
-    def _get_bucket_dir(self, verifierid):
-        avid = idlib.b2a(verifierid)
-        return os.path.join(self._store_dir, avid)
-
-    def has_bucket(self, verifierid):
-        return os.path.exists(self._get_bucket_dir(verifierid))
-
-    def allocate_bucket(self, verifierid, bucket_num, size,
-                        leaser_credentials, canary):
-        bucket_dir = self._get_bucket_dir(verifierid)
-        precondition(not os.path.exists(bucket_dir))
-        precondition(isinstance(bucket_num, int))
-        bucket = WriteBucket(bucket_dir, verifierid, bucket_num, size)
-        bucket.set_leaser(leaser_credentials)
-        lease = Lease(verifierid, leaser_credentials, bucket, canary)
-        self._leases.add(lease)
-        return lease
-
-    def get_buckets(self, verifierid):
-        # for now, only returns those created by this process, in this run
-        bucket_dir = self._get_bucket_dir(verifierid)
-        if os.path.exists(bucket_dir):
-            b = ReadBucket(bucket_dir, verifierid)
-            return [(b.get_bucket_num(), b)]
-        else:
-            return []
-
-class Lease(Referenceable):
-    implements(RIBucketWriter)
-
-    def __init__(self, verifierid, leaser, bucket, canary):
-        self._leaser = leaser
-        self._verifierid = verifierid
-        self._bucket = bucket
-        canary.notifyOnDisconnect(self._lost_canary)
-
-    def get_bucket(self):
-        return self._bucket
-
-    def remote_write(self, data):
-        self._bucket.write(data)
-
-    def remote_set_metadata(self, metadata):
-        self._bucket.set_metadata(metadata)
-
-    def remote_close(self):
-        self._bucket.close()
-
-    def _lost_canary(self):
-        pass
-
-class Bucket:
-    def __init__(self, bucket_dir, verifierid):
-        self._bucket_dir = bucket_dir
-        self._verifierid = verifierid
-
-    def _write_attr(self, name, val):
-        f = file(os.path.join(self._bucket_dir, name), 'wb')
-        f.write(val)
-        f.close()
-
-    def _read_attr(self, name):
-        f = file(os.path.join(self._bucket_dir, name), 'rb')
-        data = f.read()
-        f.close()
-        return data
-
-    def is_complete(self):
-        return os.path.exists(os.path.join(self._bucket_dir, 'closed'))
-
-class WriteBucket(Bucket):
-    def __init__(self, bucket_dir, verifierid, bucket_num, size):
-        Bucket.__init__(self, bucket_dir, verifierid)
-        precondition(not os.path.exists(bucket_dir))
-        #log.msg("WriteBucket [%s]: creating bucket %s"
-        #        % (idlib.b2a(verifierid), bucket_dir))
-        os.mkdir(bucket_dir)
-
-        self._open = True
-        self._size = size
-        self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
-        self._bytes_written = 0
-
-        self._write_attr('bucket_num', str(bucket_num))
-
-    def set_leaser(self, leaser):
-        self._write_attr('leases', leaser)
-
-    def write(self, data):
-        precondition(self._open)
-        precondition(len(data) + self._bytes_written <= self._size)
-        self._data.write(data)
-        self._data.flush()
-        self._bytes_written += len(data)
-
-    def set_metadata(self, metadata):
-        precondition(self._open)
-        self._write_attr('metadata', metadata)
-
-    def close(self):
-        precondition(self._bytes_written == self._size)
-        #log.msg("WriteBucket.close [%s] (%s)"
-        #        % (idlib.b2a(self._verifierid), self._bucket_dir))
-        self._data.close()
-        self._write_attr('closed', '')
-        self._open = False
-
-    def is_complete(self):
-        complete = Bucket.is_complete(self)
-        if complete:
-            _assert(os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size)
-        return complete
-
-class ReadBucket(Bucket, Referenceable):
-    implements(RIBucketReader)
-
-    def __init__(self, bucket_dir, verifierid):
-        Bucket.__init__(self, bucket_dir, verifierid)
-        precondition(self.is_complete()) # implicitly asserts bucket_dir exists
-
-    def get_bucket_num(self):
-        return int(self._read_attr('bucket_num'))
-
-    def read(self):
-        return self._read_attr('data')
-    remote_read = read
-
-    def get_metadata(self):
-        return self._read_attr('metadata')
-    remote_get_metadata = get_metadata
diff --git a/src/allmydata/codec.py b/src/allmydata/codec.py
index f75235f6..7cbd9f4d 100644
--- a/src/allmydata/codec.py
+++ b/src/allmydata/codec.py
@@ -42,6 +42,9 @@ class ReplicatingEncoder(object):
     def get_share_size(self):
         return self.data_size
 
+    def get_block_size(self):
+        return self.data_size
+
     def encode(self, inshares, desired_shareids=None):
         assert isinstance(inshares, list)
         for inshare in inshares:
@@ -59,7 +62,7 @@ class ReplicatingDecoder(object):
     def set_serialized_params(self, params):
         self.required_shares = int(params)
 
-    def get_required_shares(self):
+    def get_needed_shares(self):
         return self.required_shares
 
     def decode(self, some_shares, their_shareids):
@@ -97,6 +100,9 @@ class CRSEncoder(object):
     def get_share_size(self):
         return self.share_size
 
+    def get_block_size(self):
+        return self.share_size
+
     def encode(self, inshares, desired_share_ids=None):
         precondition(desired_share_ids is None or len(desired_share_ids) <= self.max_shares, desired_share_ids, self.max_shares)
 
@@ -129,7 +135,7 @@ class CRSDecoder(object):
             print "max_shares: %d" % self.max_shares
             print "required_shares: %d" % self.required_shares
 
-    def get_required_shares(self):
+    def get_needed_shares(self):
         return self.required_shares
 
     def decode(self, some_shares, their_shareids):
diff --git a/src/allmydata/debugshell.py b/src/allmydata/debugshell.py
index b075e1f5..9e3145f2 100644
--- a/src/allmydata/debugshell.py
+++ b/src/allmydata/debugshell.py
@@ -1,21 +1,5 @@
-import os
 
 # 'app' is overwritten by manhole when the connection is established. We set
 # it to None now to keep pyflakes from complaining.
 app = None
 
-def get_random_bucket_on(nodeid, size=200):
-    d = app.get_remote_service(nodeid, 'storageserver')
-    def get_bucket(rss):
-        return rss.callRemote('allocate_bucket',
-                              verifierid=os.urandom(20),
-                              bucket_num=26,
-                              size=size,
-                              leaser=app.tub.tubID,
-                              )
-    d.addCallback(get_bucket)
-    return d
-
-def write_to_bucket(bucket, bytes=100):
-    return bucket.callRemote('write', data=os.urandom(bytes))
-
diff --git a/src/allmydata/download.py b/src/allmydata/download.py
index f08f3a28..d7ed7d9b 100644
--- a/src/allmydata/download.py
+++ b/src/allmydata/download.py
@@ -1,13 +1,14 @@
 
-import os, sha
+import os, random, sha
 from zope.interface import implements
-from twisted.python import failure, log
+from twisted.python import log
 from twisted.internet import defer
 from twisted.application import service
 
-from allmydata.util import idlib, bencode
+from allmydata.util import idlib, bencode, mathutil
 from allmydata.util.deferredutil import DeferredListShouldSucceed
 from allmydata import codec
+from allmydata.Crypto.Cipher import AES
 from allmydata.uri import unpack_uri
 from allmydata.interfaces import IDownloadTarget, IDownloader
 
@@ -18,92 +19,187 @@ class HaveAllPeersError(Exception):
     # we use this to jump out of the loop
     pass
 
+
+class Output:
+    def __init__(self, downloadable, key):
+        self.downloadable = downloadable
+        self._decryptor = AES.new(key=key, mode=AES.MODE_CTR,
+                                  counterstart="\x00"*16)
+        self._verifierid_hasher = sha.new(netstring("allmydata_v1_verifierid"))
+        self._fileid_hasher = sha.new(netstring("allmydata_v1_fileid"))
+    def write(self, crypttext):
+        self._verifierid_hasher.update(crypttext)
+        plaintext = self._decryptor.decrypt(crypttext)
+        self._fileid_hasher.update(plaintext)
+        self.downloadable.write(plaintext)
+    def finish(self):
+        self.downloadable.close()
+        return self.downloadable.finish()
+
+class BlockDownloader:
+    def __init__(self, bucket, blocknum, parent):
+        self.bucket = bucket
+        self.blocknum = blocknum
+        self.parent = parent
+        
+    def start(self, segnum):
+        d = self.bucket.callRemote('get_block', segnum)
+        d.addCallbacks(self._hold_block, self._got_block_error)
+        return d
+
+    def _hold_block(self, data):
+        self.parent.hold_block(self.blocknum, data)
+
+    def _got_block_error(self, f):
+        self.parent.bucket_failed(self.blocknum, self.bucket)
+
+class SegmentDownloader:
+    def __init__(self, segmentnumber, needed_shares):
+        self.segmentnumber = segmentnumber
+        self.needed_blocks = needed_shares
+        self.blocks = {} # k: blocknum, v: data
+
+    def start(self):
+        return self._download()
+
+    def _download(self):
+        d = self._try()
+        def _done(res):
+            if len(self.blocks) >= self.needed_blocks:
+                return self.blocks
+            else:
+                return self._download()
+        d.addCallback(_done)
+        return d
+
+    def _try(self):
+        while len(self.parent.active_buckets) < self.needed_blocks:
+            # need some more
+            otherblocknums = list(set(self.parent._share_buckets.keys()) - set(self.parent.active_buckets.keys()))
+            if not otherblocknums:
+                raise NotEnoughPeersError
+            blocknum = random.choice(otherblocknums)
+            self.parent.active_buckets[blocknum] = random.choice(self.parent._share_buckets[blocknum])
+
+        # Now we have enough buckets, in self.parent.active_buckets.
+        l = []
+        for blocknum, bucket in self.parent.active_buckets.iteritems():
+            bd = BlockDownloader(bucket, blocknum, self)
+            d = bd.start(self.segmentnumber)
+            l.append(d)
+        return defer.DeferredList(l)
+
+    def hold_block(self, blocknum, data):
+        self.blocks[blocknum] = data
+
+    def bucket_failed(self, shnum, bucket):
+        del self.parent.active_buckets[shnum]
+        s = self.parent._share_buckets[shnum]
+        s.remove(bucket)
+        if not s:
+            del self.parent._share_buckets[shnum]
+        
 class FileDownloader:
     debug = False
 
-    def __init__(self, peer, uri):
-        self._peer = peer
-        (codec_name, codec_params, verifierid) = unpack_uri(uri)
+    def __init__(self, client, uri, downloadable):
+        self._client = client
+        self._downloadable = downloadable
+        (codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size) = unpack_uri(uri)
         assert isinstance(verifierid, str)
         assert len(verifierid) == 20
         self._verifierid = verifierid
+        self._roothash = roothash
         self._decoder = codec.get_decoder_by_name(codec_name)
         self._decoder.set_serialized_params(codec_params)
-        self.needed_shares = self._decoder.get_required_shares()
+        self._total_segments = mathutil.div_ceil(size, segment_size)
+        self._current_segnum = 0
+        self._segment_size = segment_size
+        self._needed_shares = self._decoder.get_needed_shares()
 
-    def set_download_target(self, target):
-        self._target = target
-        self._target.register_canceller(self._cancel)
-
-    def _cancel(self):
-        pass
+        # future:
+        # self._share_hash_tree = ??
+        # self._subshare_hash_trees = {} # k:shnum, v: hashtree
+        # each time we start using a new shnum, we must acquire a share hash
+        # from one of the buckets that provides that shnum, then validate it against
+        # the rest of the share hash tree that they provide. Then, each time we
+        # get a block in that share, we must validate the block against the rest
+        # of the subshare hash tree that that bucket will provide.
 
     def start(self):
         log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
         if self.debug:
             print "starting download"
         # first step: who should we download from?
+        self.active_buckets = {} # k: shnum, v: bucket
+        self._share_buckets = {} # k: shnum, v: set of buckets
+
+        key = "\x00" * 16
+        self._output = Output(self._downloadable, key)
+ 
+        d = defer.maybeDeferred(self._get_all_shareholders)
+        d.addCallback(self._got_all_shareholders)
+        d.addCallback(self._download_all_segments)
+        d.addCallback(self._done)
+        return d
 
-        # maybe limit max_peers to 2*len(self.shares), to reduce memory
-        # footprint
-        max_peers = None
+    def _get_all_shareholders(self):
+        dl = []
+        for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._verifierid):
+            d = connection.callRemote("get_buckets", self._verifierid)
+            d.addCallbacks(self._got_response, self._got_error,
+                           callbackArgs=(connection,))
+            dl.append(d)
+        return defer.DeferredList(dl)
+
+    def _got_response(self, buckets, connection):
+        for sharenum, bucket in buckets:
+            self._share_buckets.setdefault(sharenum, set()).add(bucket)
+        
+    def _got_error(self, f):
+        self._client.log("Somebody failed. -- %s" % (f,))
 
-        self.permuted = self._peer.get_permuted_connections(self._verifierid, max_peers)
-        for p in self.permuted:
-            assert isinstance(p, str)
-        self.landlords = [] # list of (peerid, bucket_num, remotebucket)
+    def _got_all_shareholders(self, res):
+        if len(self._share_buckets) < self._needed_shares:
+            raise NotEnoughPeersError
 
-        d = defer.maybeDeferred(self._check_next_peer)
-        d.addCallback(self._got_all_peers)
+        self.active_buckets = {}
+        
+    def _download_all_segments(self):
+        d = self._download_segment(self._current_segnum)
+        def _done(res):
+            if self._current_segnum == self._total_segments:
+                return None
+            return self._download_segment(self._current_segnum)
+        d.addCallback(_done)
         return d
 
-    def _check_next_peer(self):
-        if len(self.permuted) == 0:
-            # there are no more to check
-            raise NotEnoughPeersError
-        peerid = self.permuted.pop(0)
-
-        d = self._peer.get_remote_service(peerid, "storageserver")
-        def _got_peer(service):
-            bucket_num = len(self.landlords)
-            if self.debug: print "asking %s" % idlib.b2a(peerid)
-            d2 = service.callRemote("get_buckets", verifierid=self._verifierid)
-            def _got_response(buckets):
-                if buckets:
-                    bucket_nums = [num for (num,bucket) in buckets]
-                    if self.debug:
-                        print " peerid %s has buckets %s" % (idlib.b2a(peerid),
-                                                             bucket_nums)
-
-                    self.landlords.append( (peerid, buckets) )
-                if len(self.landlords) >= self.needed_shares:
-                    if self.debug: print " we're done!"
-                    raise HaveAllPeersError
-                # otherwise we fall through to search more peers
-            d2.addCallback(_got_response)
-            return d2
-        d.addCallback(_got_peer)
-
-        def _done_with_peer(res):
-            if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
-            if isinstance(res, failure.Failure):
-                if res.check(HaveAllPeersError):
-                    if self.debug: print " all done"
-                    # we're done!
-                    return
-                if res.check(IndexError):
-                    if self.debug: print " no connection"
-                else:
-                    if self.debug: print " other error:", res
+    def _download_segment(self, segnum):
+        segmentdler = SegmentDownloader(segnum, self._needed_shares)
+        d = segmentdler.start()
+        d.addCallback(self._decoder.decode)
+        def _done(res):
+            self._current_segnum += 1
+            if self._current_segnum == self._total_segments:
+                data = ''.join(res)
+                padsize = mathutil.pad_size(self._size, self._segment_size)
+                data = data[:-padsize]
+                self.output.write(data)
             else:
-                if self.debug: print " they had data for us"
-            # we get here for either good peers (when we still need more), or
-            # after checking a bad peer (and thus still need more). So now we
-            # need to grab a new peer.
-            return self._check_next_peer()
-        d.addBoth(_done_with_peer)
+                for buf in res:
+                    self.output.write(buf)
+        d.addCallback(_done)
         return d
 
+    def _done(self, res):
+        return self._output.finish()
+        
+    def _write_data(self, data):
+        self._verifierid_hasher.update(data)
+        
+        
+
+# old stuff
     def _got_all_peers(self, res):
         all_buckets = []
         for peerid, buckets in self.landlords:
diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
index 1cd60a7e..1cfb5dae 100644
--- a/src/allmydata/interfaces.py
+++ b/src/allmydata/interfaces.py
@@ -57,7 +57,11 @@ class RIBucketWriter(RemoteInterface):
 class RIStorageServer(RemoteInterface):
     def allocate_buckets(verifierid=Verifierid, sharenums=SetOf(int),
                          sharesize=int, blocksize=int, canary=Referenceable_):
-        # if the canary is lost before close(), the bucket is deleted
+        """
+        @param canary: If the canary is lost before close(), the bucket is deleted.
+        @return: tuple of (alreadygot, allocated), where alreadygot is what we
+            already have and is what we hereby agree to accept
+        """
         return TupleOf(SetOf(int), DictOf(int, RIBucketWriter))
     def get_buckets(verifierid=Verifierid):
         return DictOf(int, RIBucketReader_)
@@ -66,7 +70,7 @@ class RIBucketReader(RemoteInterface):
     def get_block(blocknum=int):
         return ShareData
     def get_block_hashes():
-        return ListOf(Hash))
+        return ListOf(Hash)
     def get_share_hashes():
         return ListOf(TupleOf(int, Hash))
 
@@ -157,6 +161,10 @@ class ICodecEncoder(Interface):
         compatible decoder.
         """
 
+    def get_block_size():
+        """Return the length of the shares that encode() will produce.
+        """
+
     def get_share_size():
         """Return the length of the shares that encode() will produce.
         """
@@ -271,7 +279,7 @@ class ICodecDecoder(Interface):
         """Set up the parameters of this encoder, from a string returned by
         encoder.get_serialized_params()."""
 
-    def get_required_shares():
+    def get_needed_shares():
         """Return the number of shares needed to reconstruct the data.
         set_serialized_params() is required to be called before this."""
 
diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py
index 4286accb..bb9622d1 100644
--- a/src/allmydata/storageserver.py
+++ b/src/allmydata/storageserver.py
@@ -1,4 +1,4 @@
-import os
+import os, re
 
 from foolscap import Referenceable
 from twisted.application import service
@@ -10,7 +10,7 @@ from allmydata.util import bencode, fileutil, idlib
 from allmydata.util.assertutil import _assert, precondition
 
 # store/
-# store/tmp # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success
+# store/incoming # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success
 # store/$VERIFIERID
 # store/$VERIFIERID/$SHARENUM
 # store/$VERIFIERID/$SHARENUM/blocksize
@@ -24,20 +24,20 @@ NUM_RE=re.compile("[1-9][0-9]*")
 class BucketWriter(Referenceable):
     implements(RIBucketWriter)
 
-    def __init__(self, tmphome, finalhome, blocksize):
-        self.tmphome = tmphome
+    def __init__(self, incominghome, finalhome, blocksize):
+        self.incominghome = incominghome
         self.finalhome = finalhome
         self.blocksize = blocksize
         self.closed = False
         self._write_file('blocksize', str(blocksize))
 
     def _write_file(self, fname, data):
-        open(os.path.join(tmphome, fname), 'wb').write(data)
+        open(os.path.join(self.incominghome, fname), 'wb').write(data)
 
     def remote_put_block(self, segmentnum, data):
         precondition(not self.closed)
         assert len(data) == self.blocksize
-        f = open(os.path.join(self.tmphome, 'data'), 'wb')
+        f = open(os.path.join(self.incominghome, 'data'), 'wb')
         f.seek(self.blocksize*segmentnum)
         f.write(data)
 
@@ -54,7 +54,7 @@ class BucketWriter(Referenceable):
     def close(self):
         precondition(not self.closed)
         # TODO assert or check the completeness and consistency of the data that has been written
-        fileutil.rename(self.tmphome, self.finalhome)
+        fileutil.rename(self.incominghome, self.finalhome)
         self.closed = True
 
 def str2l(s):
@@ -87,24 +87,28 @@ class StorageServer(service.MultiService, Referenceable):
     def __init__(self, storedir):
         fileutil.make_dirs(storedir)
         self.storedir = storedir
-        self.tmpdir = os.path.join(storedir, 'tmp')
-        self._clean_trash()
-        fileutil.make_dirs(self.tmpdir)
+        self.incomingdir = os.path.join(storedir, 'incoming')
+        self._clean_incomplete()
+        fileutil.make_dirs(self.incomingdir)
 
         service.MultiService.__init__(self)
 
-    def _clean_trash(self):
-        fileutil.rm_dir(self.tmpdir)
+    def _clean_incomplete(self):
+        fileutil.rm_dir(self.incomingdir)
 
     def remote_allocate_buckets(self, verifierid, sharenums, sharesize,
                                 blocksize, canary):
-        bucketwriters = {} # k: sharenum, v: BucketWriter
-        for sharenum in sharenums:
-            tmphome = os.path.join(self.tmpdir, idlib.a2b(verifierid), "%d"%sharenum)
-            finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%sharenum)
-            bucketwriters[sharenum] = BucketWriter(tmphome, finalhome, blocksize)
+        alreadygot = set()
+        bucketwriters = {} # k: shnum, v: BucketWriter
+        for shnum in sharenums:
+            incominghome = os.path.join(self.incomingdir, idlib.a2b(verifierid), "%d"%shnum)
+            finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%shnum)
+            if os.path.exists(incominghome) or os.path.exists(finalhome):
+                alreadygot.add(shnum)
+            else:
+                bucketwriters[shnum] = BucketWriter(incominghome, finalhome, blocksize)
             
-        return bucketwriters
+        return alreadygot, bucketwriters
 
     def remote_get_buckets(self, verifierid):
         bucketreaders = {} # k: sharenum, v: BucketReader
diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py
index 686f23b0..0328ae9e 100644
--- a/src/allmydata/test/test_upload.py
+++ b/src/allmydata/test/test_upload.py
@@ -6,221 +6,19 @@ from cStringIO import StringIO
 from allmydata import upload
 from allmydata.uri import unpack_uri
 
-class StringBucketProxy:
-    # This is for unit tests: make a StringIO look like a RIBucketWriter.
-
-    def __init__(self):
-        self.data = StringIO()
-        self.size = None
-        self.done = False
-
-    def callRemote(self, methname, **kwargs):
-        if methname == "write":
-            return defer.maybeDeferred(self.write, **kwargs)
-        elif methname == "close":
-            return defer.maybeDeferred(self.close, **kwargs)
-        else:
-            return defer.fail(NameError("no such method named %s" % methname))
-
-    def write(self, data):
-        self.data.write(data)
-    def close(self):
-        self.done = True
-
-
-class FakePeer:
-    def __init__(self, peerid, response):
-        self.peerid = peerid
-        self.response = response
-
-    def callRemote(self, methname, *args, **kwargs):
-        assert not args
-        return defer.maybeDeferred(self._callRemote, methname, **kwargs)
-
-    def _callRemote(self, methname, **kwargs):
-        assert methname == "allocate_bucket"
-        #assert kwargs["size"] == 100
-        assert kwargs["leaser"] == "fakeclient"
-        if self.response == "good":
-            return self
-        raise upload.TooFullError()
+class FakeStorageServer:
+    pass
 
 class FakeClient:
-    nodeid = "fakeclient"
-    def __init__(self, responses):
-        self.peers = []
-        for peerid,r in enumerate(responses):
-            if r == "disconnected":
-                self.peers.append(None)
-            else:
-                self.peers.append(FakePeer(str(peerid), r))
-
-    def get_permuted_connections(self, key):
-        return [str(i) for i in range(len(self.peers))]
-
-    def get_remote_service(self, peerid, name):
-        peer = self.peers[int(peerid)]
-        if not peer:
-            return defer.fail(IndexError("no connection to that peer"))
-        return defer.succeed(peer)
-
-
-class NextPeerUploader(upload.FileUploader):
-    _size = 100
-    def _got_enough_peers(self, res):
-        return res
-
-class NextPeer(unittest.TestCase):
-    responses = ["good", # 0
-                 "full", # 1
-                 "full", # 2
-                 "disconnected", # 3
-                 "good", # 4
-                 ]
-
-    def compare_landlords(self, u, c, expected):
-        exp = [(str(peerid), bucketnum, c.peers[peerid])
-               for peerid, bucketnum in expected]
-        self.failUnlessEqual(u.landlords, exp)
-
-    VERIFIERID = "\x00" * 20
-    def test_0(self):
-        c = FakeClient([])
-        u = NextPeerUploader(c)
-        u.set_verifierid(self.VERIFIERID)
-        u.set_params(2, 2, 2)
-        d = u.start()
-        def _check(f):
-            f.trap(upload.NotEnoughPeersError)
-        d.addCallbacks(lambda res: self.fail("this was supposed to fail"),
-                       _check)
-        return d
-
-    def test_1(self):
-        c = FakeClient(self.responses)
-        u = NextPeerUploader(c)
-        u.set_verifierid(self.VERIFIERID)
-        u.set_params(2, 2, 2)
-        d = u.start()
-        def _check(res):
-            self.failUnlessEqual(u.goodness_points, 2)
-            self.compare_landlords(u, c, [(0, 0),
-                                          (4, 1),
-                                          ])
-        d.addCallback(_check)
-        return d
-
-    def test_2(self):
-        c = FakeClient(self.responses)
-        u = NextPeerUploader(c)
-        u.set_verifierid(self.VERIFIERID)
-        u.set_params(3, 3, 3)
-        d = u.start()
-        def _check(res):
-            self.failUnlessEqual(u.goodness_points, 3)
-            self.compare_landlords(u, c, [(0, 0),
-                                          (4, 1),
-                                          (0, 2),
-                                          ])
-        d.addCallback(_check)
-        return d
-
-    responses2 = ["good", # 0
-                 "full", # 1
-                 "full", # 2
-                 "good", # 3
-                 "full", # 4
-                 ]
-
-    def test_3(self):
-        c = FakeClient(self.responses2)
-        u = NextPeerUploader(c)
-        u.set_verifierid(self.VERIFIERID)
-        u.set_params(3, 3, 3)
-        d = u.start()
-        def _check(res):
-            self.failUnlessEqual(u.goodness_points, 3)
-            self.compare_landlords(u, c, [(0, 0),
-                                          (3, 1),
-                                          (0, 2),
-                                          ])
-        d.addCallback(_check)
-        return d
-
-    responses3 = ["good", # 0
-                 "good", # 1
-                 "good", # 2
-                 "good", # 3
-                 "good", # 4
-                 ]
-
-    def test_4(self):
-        c = FakeClient(self.responses3)
-        u = NextPeerUploader(c)
-        u.set_verifierid(self.VERIFIERID)
-        u.set_params(4, 4, 4)
-        d = u.start()
-        def _check(res):
-            self.failUnlessEqual(u.goodness_points, 4)
-            self.compare_landlords(u, c, [(0, 0),
-                                          (1, 1),
-                                          (2, 2),
-                                          (3, 3),
-                                          ])
-        d.addCallback(_check)
-        return d
-
-
-class FakePeer2:
-    def __init__(self, peerid):
-        self.peerid = peerid
-        self.data = ""
-
-    def callRemote(self, methname, *args, **kwargs):
-        if methname == "allocate_bucket":
-            return defer.maybeDeferred(self._allocate_bucket, *args, **kwargs)
-        if methname == "write":
-            return defer.maybeDeferred(self._write, *args, **kwargs)
-        if methname == "set_metadata":
-            return defer.maybeDeferred(self._set_metadata, *args, **kwargs)
-        if methname == "close":
-            return defer.maybeDeferred(self._close, *args, **kwargs)
-        return defer.maybeDeferred(self._bad_name, methname)
-
-    def _allocate_bucket(self, verifierid, bucket_num, size, leaser, canary):
-        self.allocated_size = size
-        return self
-    def _write(self, data):
-        self.data = self.data + data
-    def _set_metadata(self, metadata):
-        self.metadata = metadata
-    def _close(self):
-        pass
-    def _bad_name(self, methname):
-        raise NameError("FakePeer2 has no such method named '%s'" % methname)
-
-class FakeClient2:
-    nodeid = "fakeclient"
-    def __init__(self, num_peers):
-        self.peers = []
-        for peerid in range(num_peers):
-            self.peers.append(FakePeer2(str(peerid)))
-
-    def get_permuted_connections(self, key):
-        return [str(i) for i in range(len(self.peers))]
-
-    def get_remote_service(self, peerid, name):
-        peer = self.peers[int(peerid)]
-        if not peer:
-            return defer.fail(IndexError("no connection to that peer"))
-        return defer.succeed(peer)
+    def get_permuted_peers(self, verifierid):
+        return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(),) for fakeid in range(50) ]
 
 class Uploader(unittest.TestCase):
     def setUp(self):
-        node = self.node = FakeClient2(10)
-        u = self.u = upload.Uploader()
-        u.running = 1
-        u.parent = node
+        self.node = FakeClient()
+        self.u = upload.Uploader()
+        self.u.running = True
+        self.u.parent = self.node
 
     def _check(self, uri):
         self.failUnless(isinstance(uri, str))
diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py
index c994c9c0..d0f36c20 100644
--- a/src/allmydata/upload.py
+++ b/src/allmydata/upload.py
@@ -84,15 +84,19 @@ class FileUploader:
 
         # create the encoder, so we can know how large the shares will be
         self._encoder = self.ENCODERCLASS()
+        self._last_seg_encoder = self.ENCODERCLASS() # This one is for encoding the final segment, which might be shorter than the others.
         self._codec_name = self._encoder.get_encoder_type()
+        self._encoder.set_params(self.segment_size, self.needed_shares, self.total_shares)
+xyz
+
         paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
-        self._encoder.set_params(paddedsize, self.needed_shares, self.total_shares)
-        self._share_size = self._encoder.get_share_size()
+
+        self._block_size = self._encoder.get_block_size()
 
         # first step: who should we upload to?
         peers = self._client.get_permuted_peers(self._verifierid)
         assert peers
-        trackers = [ (permutedid, PeerTracker(peerid, conn),)
+        trackers = [ (permutedid, PeerTracker(peerid, conn, self._share_size, self._block_size, self._verifierid),)
                      for permutedid, peerid, conn in peers ]
         ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance
         ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ])
@@ -192,7 +196,7 @@ class FileUploader:
 
     def _compute_uri(self, roothash):
         params = self._encoder.get_serialized_params()
-        return pack_uri(self._codec_name, params, self._verifierid, roothash)
+        return pack_uri(self._codec_name, params, self._verifierid, roothash, self.needed_shares, self.total_shares, self._size, self._encoder.segment_size)
 
 
 def netstring(s):
diff --git a/src/allmydata/uri.py b/src/allmydata/uri.py
index 3a96d2c1..4f2f0d9d 100644
--- a/src/allmydata/uri.py
+++ b/src/allmydata/uri.py
@@ -5,7 +5,7 @@ from allmydata.util import idlib
 # enough information to retrieve and validate the contents. It shall be
 # expressed in a limited character set (namely [TODO]).
 
-def pack_uri(codec_name, codec_params, verifierid):
+def pack_uri(codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size):
     assert isinstance(codec_name, str)
     assert len(codec_name) < 10
     assert ":" not in codec_name
@@ -13,13 +13,18 @@ def pack_uri(codec_name, codec_params, verifierid):
     assert ":" not in codec_params
     assert isinstance(verifierid, str)
     assert len(verifierid) == 20 # sha1 hash
-    return "URI:%s:%s:%s" % (codec_name, codec_params, idlib.b2a(verifierid))
+    return "URI:%s:%s:%s:%s:%s:%s:%s:%s" % (codec_name, codec_params, idlib.b2a(verifierid), idlib.b2a(roothash), needed_shares, total_shares, size, segment_size)
 
 
 def unpack_uri(uri):
     assert uri.startswith("URI:")
-    header, codec_name, codec_params, verifierid_s = uri.split(":")
+    header, codec_name, codec_params, verifierid_s, roothash_s, needed_shares_s, total_shares_s, size_s, segment_size_s = uri.split(":")
     verifierid = idlib.a2b(verifierid_s)
-    return codec_name, codec_params, verifierid
+    roothash = idlib.a2b(roothash_s)
+    needed_shares = idlib.a2b(needed_shares_s)
+    total_shares = idlib.a2b(total_shares_s)
+    size = int(size_s)
+    segment_size = int(segment_size_s)
+    return codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size
 
 
-- 
2.45.2