From: Brian Warner <>
Date: Fri, 13 Jul 2007 21:04:49 +0000 (-0700)
Subject: storage: use one file per share instead of 7 (#85). work-in-progress, tests still... 

storage: use one file per share instead of 7 (#85). work-in-progress, tests still fail

diff --git a/src/allmydata/ b/src/allmydata/
index 008daf05..e048f1d5 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -102,8 +102,17 @@ class ValidatedBucket:
         self.share_hash_tree = share_hash_tree
         self._roothash = roothash
         self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks)
+        self.started = False
     def get_block(self, blocknum):
+        if not self.started:
+            d = self.bucket.start()
+            def _started(res):
+                self.started = True
+                return self.get_block(blocknum)
+            d.addCallback(_started)
+            return d
         # the first time we use this bucket, we need to fetch enough elements
         # of the share hash tree to validate it from our share hash up to the
         # hashroot.
@@ -380,7 +389,8 @@ class FileDownloader:
         bucket = sources[0]
         sources = sources[1:]
         #d = bucket.callRemote(methname, *args)
-        d = getattr(bucket, methname)(*args)
+        d = bucket.startIfNecessary()
+        d.addCallback(lambda res: getattr(bucket, methname)(*args))
         d.addCallback(validatorfunc, bucket)
         def _bad(f):
             log.msg("%s from vbucket %s failed: %s" % (name, bucket, f)) # WEIRD
diff --git a/src/allmydata/ b/src/allmydata/
index 830b5b63..a36b71a6 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -180,6 +180,9 @@ class Encoder(object):
         self.setup_codec() # TODO: duplicate call?
         d = defer.succeed(None)
+        for l in self.landlords.values():
+            d.addCallback(lambda res, l=l: l.start())
         for i in range(self.num_segments-1):
             # note to self: this form doesn't work, because lambda only
             # captures the slot, not the value
diff --git a/src/allmydata/ b/src/allmydata/
index 731c22ce..9a1f2145 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -61,6 +61,38 @@ class RIClient(RemoteInterface):
         return Nodeid
 class RIBucketWriter(RemoteInterface):
+    def write(offset=int, data=ShareData):
+        return None
+    def close():
+        """
+        If the data that has been written is incomplete or inconsistent then
+        the server will throw the data away, else it will store it for future
+        retrieval.
+        """
+        return None
+class RIBucketReader(RemoteInterface):
+    def read(offset=int, length=int):
+        return ShareData
+class RIStorageServer(RemoteInterface):
+    def allocate_buckets(storage_index=StorageIndex,
+                         sharenums=SetOf(int, maxLength=MAX_BUCKETS),
+                         sharesize=int, blocksize=int, canary=Referenceable):
+        """
+        @param canary: If the canary is lost before close(), the bucket is deleted.
+        @return: tuple of (alreadygot, allocated), where alreadygot is what we
+            already have and is what we hereby agree to accept
+        """
+        return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
+                       DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
+    def get_buckets(storage_index=StorageIndex):
+        return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
+class IStorageBucketWriter(Interface):
     def put_block(segmentnum=int, data=ShareData):
         """@param data: For most segments, this data will be 'blocksize'
         bytes in length. The last segment might be shorter.
@@ -92,16 +124,11 @@ class RIBucketWriter(RemoteInterface):
             write(k + ':' + netstring(dict[k]))
         return None
     def close():
-        """
-        If the data that has been written is incomplete or inconsistent then
-        the server will throw the data away, else it will store it for future
-        retrieval.
-        """
-        return None
+        pass
+class IStorageBucketReader(Interface):
-class RIBucketReader(RemoteInterface):
     def get_block(blocknum=int):
         """Most blocks will be the same size. The last block might be shorter
         than the others.
@@ -121,55 +148,6 @@ class RIBucketReader(RemoteInterface):
         return URIExtensionData
-class RIStorageServer(RemoteInterface):
-    def allocate_buckets(storage_index=StorageIndex,
-                         sharenums=SetOf(int, maxLength=MAX_BUCKETS),
-                         sharesize=int, blocksize=int, canary=Referenceable):
-        """
-        @param canary: If the canary is lost before close(), the bucket is deleted.
-        @return: tuple of (alreadygot, allocated), where alreadygot is what we
-            already have and is what we hereby agree to accept
-        """
-        return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
-                       DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
-    def get_buckets(storage_index=StorageIndex):
-        return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
-class IStorageBucketWriter(Interface):
-    def put_block(segmentnum, data):
-        pass
-    def put_plaintext_hashes(hashes):
-        pass
-    def put_crypttext_hashes(hashes):
-        pass
-    def put_block_hashes(blockhashes):
-        pass
-    def put_share_hashes(sharehashes):
-        pass
-    def put_uri_extension(data):
-        pass
-    def close():
-        pass
-class IStorageBucketReader(Interface):
-    def get_block(blocknum):
-        pass
-    def get_plaintext_hashes():
-        pass
-    def get_crypttext_hashes():
-        pass
-    def get_block_hashes():
-        pass
-    def get_share_hashes():
-        pass
-    def get_uri_extension():
-        pass
 # hm, we need a solution for forward references in schemas
 from foolscap.schema import Any
diff --git a/src/allmydata/ b/src/allmydata/
index 66cf33e0..4cc53e13 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -1,13 +1,14 @@
-import os, re, weakref
+import os, re, weakref, stat, struct
 from foolscap import Referenceable
 from twisted.application import service
+from twisted.internet import defer
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
      RIBucketReader, IStorageBucketWriter, IStorageBucketReader
 from allmydata import interfaces
-from allmydata.util import bencode, fileutil, idlib
+from allmydata.util import fileutil, idlib
 from allmydata.util.assertutil import precondition
 # store/
@@ -25,110 +26,46 @@ NUM_RE=re.compile("[0-9]*")
 class BucketWriter(Referenceable):
-    def __init__(self, ss, incominghome, finalhome, blocksize, sharesize):
+    def __init__(self, ss, incominghome, finalhome, size): = ss
         self.incominghome = incominghome
         self.finalhome = finalhome
-        self.blocksize = blocksize
-        self.sharesize = sharesize
+        self._size = size
         self.closed = False
-        self._next_segnum = 0
-        fileutil.make_dirs(incominghome)
-        self._write_file('blocksize', str(blocksize))
+        # touch the file, so later callers will see that we're working on it
+        f = open(self.incominghome, 'ab')
+        f.close()
     def allocated_size(self):
-        return self.sharesize
+        return self._size
-    def _write_file(self, fname, data):
-        open(os.path.join(self.incominghome, fname), 'wb').write(data)
-    def remote_put_block(self, segmentnum, data):
+    def remote_write(self, offset, data):
         precondition(not self.closed)
-        # all blocks but the last will be of size self.blocksize, however the
-        # last one may be short, and we don't know the total number of
-        # segments so we can't tell which is which.
-        assert len(data) <= self.blocksize
-        assert segmentnum == self._next_segnum # must write in sequence
-        self._next_segnum = segmentnum + 1
-        f = fileutil.open_or_create(os.path.join(self.incominghome, 'data'))
+        precondition(offset >= 0)
+        precondition(offset+len(data) <= self._size)
+        f = open(self.incominghome, 'ab')
-    def remote_put_plaintext_hashes(self, hashes):
-        precondition(not self.closed)
-        # TODO: verify the length of blockhashes.
-        # TODO: tighten foolscap schema to require exactly 32 bytes.
-        self._write_file('plaintext_hashes', ''.join(hashes))
-    def remote_put_crypttext_hashes(self, hashes):
-        precondition(not self.closed)
-        # TODO: verify the length of blockhashes.
-        # TODO: tighten foolscap schema to require exactly 32 bytes.
-        self._write_file('crypttext_hashes', ''.join(hashes))
-    def remote_put_block_hashes(self, blockhashes):
-        precondition(not self.closed)
-        # TODO: verify the length of blockhashes.
-        # TODO: tighten foolscap schema to require exactly 32 bytes.
-        self._write_file('blockhashes', ''.join(blockhashes))
-    def remote_put_share_hashes(self, sharehashes):
-        precondition(not self.closed)
-        self._write_file('sharehashes', bencode.bencode(sharehashes))
-    def remote_put_uri_extension(self, data):
-        precondition(not self.closed)
-        self._write_file('uri_extension', data)
+        f.close()
     def remote_close(self):
         precondition(not self.closed)
-        # TODO assert or check the completeness and consistency of the data that has been written
-        fileutil.make_dirs(os.path.dirname(self.finalhome))
         fileutil.rename(self.incominghome, self.finalhome)
-        try:
-            os.rmdir(os.path.dirname(self.incominghome))
-        except OSError:
-            # Perhaps the directory wasn't empty.  In any case, ignore the error.
-            pass
         self.closed = True
-, fileutil.du(self.finalhome))
+        filelen = os.stat(self.finalhome)[stat.ST_SIZE]
+, filelen)
-def str2l(s):
-    """ split string (pulled from storage) into a list of blockids """
-    return [ s[i:i+interfaces.HASH_SIZE] for i in range(0, len(s), interfaces.HASH_SIZE) ]
 class BucketReader(Referenceable):
     def __init__(self, home):
         self.home = home
-        self.blocksize = int(self._read_file('blocksize'))
-    def _read_file(self, fname):
-        return open(os.path.join(self.home, fname), 'rb').read()
-    def remote_get_block(self, blocknum):
-        f = open(os.path.join(self.home, 'data'), 'rb')
- * blocknum)
-        return # this might be short for the last block
-    def remote_get_plaintext_hashes(self):
-        return str2l(self._read_file('plaintext_hashes'))
-    def remote_get_crypttext_hashes(self):
-        return str2l(self._read_file('crypttext_hashes'))
-    def remote_get_block_hashes(self):
-        return str2l(self._read_file('blockhashes'))
-    def remote_get_share_hashes(self):
-        hashes = bencode.bdecode(self._read_file('sharehashes'))
-        # tuples come through bdecode(bencode()) as lists, which violates the
-        # schema
-        return [tuple(i) for i in hashes]
-    def remote_get_uri_extension(self):
-        return self._read_file('uri_extension')
+    def remote_read(self, offset, length):
+        f = open(self.home, 'rb')
+        return
 class StorageServer(service.MultiService, Referenceable):
@@ -159,7 +96,7 @@ class StorageServer(service.MultiService, Referenceable):
         return space
     def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
-                                blocksize, canary):
+                                canary):
         alreadygot = set()
         bucketwriters = {} # k: shnum, v: BucketWriter
         si_s = idlib.b2a(storage_index)
@@ -174,8 +111,9 @@ class StorageServer(service.MultiService, Referenceable):
             if os.path.exists(incominghome) or os.path.exists(finalhome):
             elif no_limits or remaining_space >= space_per_bucket:
+                fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
                 bw = BucketWriter(self, incominghome, finalhome,
-                                  blocksize, space_per_bucket)
+                                  space_per_bucket)
                 bucketwriters[shnum] = bw
                 self._active_writers[bw] = 1
                 if yes_limits:
@@ -184,6 +122,9 @@ class StorageServer(service.MultiService, Referenceable):
                 # not enough space to accept this bucket
+        if bucketwriters:
+            fileutil.make_dirs(os.path.join(self.storedir, si_s))
         return alreadygot, bucketwriters
     def bucket_writer_closed(self, bw, consumed_size):
@@ -204,24 +145,127 @@ class StorageServer(service.MultiService, Referenceable):
         return bucketreaders
+Share data is written into a single file. At the start of the file, there is
+a series of four-byte big-endian offset values, which indicate where each
+section starts. Each offset is measured from the beginning of the file.
+0x00: segment size
+0x04: offset of data (=00 00 00 1c)
+0x08: offset of plaintext_hash_tree
+0x0c: offset of crypttext_hash_tree
+0x10: offset of block_hashes
+0x14: offset of share_hashes
+0x18: offset of uri_extension_length + uri_extension
+0x1c: start of data
+      start of plaintext_hash_tree
+      start of crypttext_hash_tree
+      start of block_hashes
+      start of share_hashes
+       each share_hash is written as a two-byte (big-endian) hashnum
+       followed by the 32-byte SHA-256 hash. We only store the hashes
+       necessary to validate the share hash root
+      start of uri_extension_length (four-byte big-endian value)
+      start of uri_extension
 class WriteBucketProxy:
-    def __init__(self, rref):
+    def __init__(self, rref, data_size, segment_size, num_segments,
+                 num_share_hashes):
         self._rref = rref
+        self._segment_size = segment_size
+        HASH_SIZE = interfaces.HASH_SIZE
+        self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
+        # how many share hashes are included in each share? This will be
+        # about ln2(num_shares).
+        self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
+        offsets = self._offsets = {}
+        x = 0x1c
+        offsets['data'] = x
+        x += data_size
+        offsets['plaintext_hash_tree'] = x
+        x += self._segment_hash_size
+        offsets['crypttext_hash_tree'] = x
+        x += self._segment_hash_size
+        offsets['block_hashes'] = x
+        x += self._segment_hash_size
+        offsets['share_hashes'] = x
+        x += self._share_hash_size
+        offsets['uri_extension'] = x
+        offset_data = struct.pack(">LLLLLLL",
+                                  segment_size,
+                                  offsets['data'],
+                                  offsets['plaintext_hash_tree'],
+                                  offsets['crypttext_hash_tree'],
+                                  offsets['block_hashes'],
+                                  offsets['share_hashes'],
+                                  offsets['uri_extension']
+                                  )
+        assert len(offset_data) == 7*4
+        self._offset_data = offset_data
+    def start(self):
+        return self._write(0, self._offset_data)
     def put_block(self, segmentnum, data):
-        return self._rref.callRemote("put_block", segmentnum, data)
+        offset = self._offsets['data'] + segmentnum * self._segment_size
+        assert offset + len(data) <= self._offsets['uri_extension']
+        assert isinstance(data, str)
+        if segmentnum < self._segment_size-1:
+            assert len(data) == self._segment_size
+        else:
+            assert len(data) <= self._segment_size
+        return self._write(offset, data)
     def put_plaintext_hashes(self, hashes):
-        return self._rref.callRemote("put_plaintext_hashes", hashes)
+        offset = self._offsets['plaintext_hash_tree']
+        assert isinstance(hashes, list)
+        data = "".join(hashes)
+        assert len(data) == self._segment_hash_size
+        assert offset + len(data) <= self._offsets['crypttext_hash_tree']
+        return self._write(offset, data)
     def put_crypttext_hashes(self, hashes):
-        return self._rref.callRemote("put_crypttext_hashes", hashes)
+        offset = self._offsets['crypttext_hash_tree']
+        assert isinstance(hashes, list)
+        data = "".join(hashes)
+        assert len(data) == self._segment_hash_size
+        assert offset + len(data) <= self._offsets['block_hashes']
+        return self._write(offset, data)
     def put_block_hashes(self, blockhashes):
-        return self._rref.callRemote("put_block_hashes", blockhashes)
+        offset = self._offsets['block_hashes']
+        assert isinstance(blockhashes, list)
+        data = "".join(blockhashes)
+        assert len(data) == self._segment_hash_size
+        assert offset + len(data) <= self._offsets['share_hashes']
+        return self._write(offset, data)
     def put_share_hashes(self, sharehashes):
-        return self._rref.callRemote("put_share_hashes", sharehashes)
+        # sharehashes is a list of (index, hash) tuples, so they get stored
+        # as 2+32=34 bytes each
+        offset = self._offsets['share_hashes']
+        assert isinstance(sharehashes, list)
+        data = "".join([struct.pack(">H", hashnum) + hashvalue
+                        for hashnum,hashvalue in sharehashes])
+        assert len(data) == self._share_hash_size
+        assert offset + len(data) <= self._offsets['uri_extension']
+        return self._write(offset, data)
     def put_uri_extension(self, data):
-        return self._rref.callRemote("put_uri_extension", data)
+        offset = self._offsets['uri_extension']
+        assert isinstance(data, str)
+        length = struct.pack(">L", len(data))
+        return self._write(offset, length+data)
+    def _write(self, offset, data):
+        # TODO: for small shares, buffer the writes and do just a single call
+        return self._rref.callRemote("write", offset, data)
     def close(self):
         return self._rref.callRemote("close")
@@ -230,17 +274,87 @@ class ReadBucketProxy:
     def __init__(self, rref):
         self._rref = rref
+    def startIfNecessary(self):
+        if self._started:
+            return defer.succeed(self)
+        d = self.start()
+        d.addCallback(lambda res: self)
+        return d
+    def start(self):
+        # TODO: for small shares, read the whole bucket in start()
+        d = self._read(0, 7*4)
+        self._offsets = {}
+        def _got_offsets(data):
+            self._segment_size = struct.unpack(">L", data[0:4])[0]
+            x = 4
+            for field in ( 'data',
+                           'plaintext_hash_tree',
+                           'crypttext_hash_tree',
+                           'block_hashes',
+                           'share_hashes',
+                           'uri_extension' ):
+                offset = struct.unpack(">L", data[x:x+4])[0]
+                x += 4
+                self._offsets[field] = offset
+        d.addCallback(_got_offsets)
+        return d
     def get_block(self, blocknum):
-        return self._rref.callRemote("get_block", blocknum)
+        offset = self._offsets['data'] + blocknum * self._segment_size
+        return self._read(offset, self._segment_size)
+    def _str2l(self, s):
+        """ split string (pulled from storage) into a list of blockids """
+        return [ s[i:i+interfaces.HASH_SIZE]
+                 for i in range(0, len(s), interfaces.HASH_SIZE) ]
     def get_plaintext_hashes(self):
-        return self._rref.callRemote("get_plaintext_hashes")
+        offset = self._offsets['plaintext_hash_tree']
+        size = self._offsets['crypttext_hash_tree'] - offset
+        d = self._read(offset, size)
+        d.addCallback(self._str2l)
+        return d
     def get_crypttext_hashes(self):
-        return self._rref.callRemote("get_crypttext_hashes")
+        offset = self._offsets['crypttext_hash_tree']
+        size = self._offsets['block_hashes'] - offset
+        d = self._read(offset, size)
+        d.addCallback(self._str2l)
+        return d
     def get_block_hashes(self):
-        return self._rref.callRemote("get_block_hashes")
+        offset = self._offsets['block_hashes']
+        size = self._offsets['share_hashes'] - offset
+        d = self._read(offset, size)
+        d.addCallback(self._str2l)
+        return d
     def get_share_hashes(self):
-        return self._rref.callRemote("get_share_hashes")
-    def get_uri_extension(self):
-        return self._rref.callRemote("get_uri_extension")
+        offset = self._offsets['share_hashes']
+        size = self._offsets['uri_extension'] - offset
+        HASH_SIZE = interfaces.HASH_SIZE
+        assert size % (2+HASH_SIZE) == 0
+        d = self._read(offset, size)
+        def _unpack_share_hashes(data):
+            assert len(data) == size
+            hashes = []
+            for i in range(0, size, 2+HASH_SIZE):
+                hashnum = struct.unpack(">H", data[i:i+2])[0]
+                hashvalue = data[i+2:i+2+HASH_SIZE]
+                hashes.append( (hashnum, hashvalue) )
+            return hashes
+        d.addCallback(_unpack_share_hashes)
+        return d
+    def get_uri_extension(self):
+        offset = self._offsets['uri_extension']
+        d = self._read(offset, 4)
+        def _got_length(data):
+            length = struct.unpack(">L", data)[0]
+            return self._read(offset+4, length)
+        d.addCallback(_got_length)
+        return d
+    def _read(self, offset, length):
+        return self._rref.callRemote("read", offset, length)
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index 59857d4c..d4a42d0f 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -61,17 +61,10 @@ class FakeBucketWriter:
         self.share_hashes = None
         self.closed = False
-    def callRemote(self, methname, *args, **kwargs):
-        # this allows FakeBucketWriter to be used either as an
-        # IStorageBucketWriter or as the remote reference that it wraps. This
-        # should be cleaned up eventually when we change RIBucketWriter to
-        # have just write(offset, data) and close()
-        def _call():
-            meth = getattr(self, methname)
-            return meth(*args, **kwargs)
-        d = eventual.fireEventually()
-        d.addCallback(lambda res: _call())
-        return d
+    def startIfNecessary(self):
+        return defer.succeed(self)
+    def start(self):
+        return defer.succeed(self)
     def put_block(self, segmentnum, data):
         def _try():
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index 7d02e178..9f76d70f 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -2,18 +2,20 @@
 from twisted.trial import unittest
 from twisted.application import service
+from twisted.internet import defer
 from foolscap import Referenceable
 import os.path
-from allmydata import storageserver
-from allmydata.util import fileutil
+from allmydata import storageserver, interfaces
+from allmydata.util import fileutil, hashutil
 class Bucket(unittest.TestCase):
     def make_workdir(self, name):
-        basedir = os.path.join("test_storage", "Bucket", name)
+        basedir = os.path.join("storage", "Bucket", name)
         incoming = os.path.join(basedir, "tmp", "bucket")
         final = os.path.join(basedir, "bucket")
+        fileutil.make_dirs(os.path.join(basedir, "tmp"))
         return incoming, final
     def bucket_writer_closed(self, bw, consumed):
@@ -21,31 +23,138 @@ class Bucket(unittest.TestCase):
     def test_create(self):
         incoming, final = self.make_workdir("test_create")
-        bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
-        bw.remote_put_block(0, "a"*25)
-        bw.remote_put_block(1, "b"*25)
-        bw.remote_put_block(2, "c"*7) # last block may be short
+        bw = storageserver.BucketWriter(self, incoming, final, 200)
+        bw.remote_write(0, "a"*25)
+        bw.remote_write(25, "b"*25)
+        bw.remote_write(50, "c"*25)
+        bw.remote_write(75, "d"*7)
     def test_readwrite(self):
         incoming, final = self.make_workdir("test_readwrite")
-        bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
-        bw.remote_put_block(0, "a"*25)
-        bw.remote_put_block(1, "b"*25)
-        bw.remote_put_block(2, "c"*7) # last block may be short
-        bw.remote_put_block_hashes(["1"*32, "2"*32, "3"*32, "4"*32])
-        bw.remote_put_share_hashes([(5, "5"*32), (6, "6"*32)])
+        bw = storageserver.BucketWriter(self, incoming, final, 200)
+        bw.remote_write(0, "a"*25)
+        bw.remote_write(25, "b"*25)
+        bw.remote_write(50, "c"*7) # last block may be short
         # now read from it
         br = storageserver.BucketReader(final)
-        self.failUnlessEqual(br.remote_get_block(0), "a"*25)
-        self.failUnlessEqual(br.remote_get_block(1), "b"*25)
-        self.failUnlessEqual(br.remote_get_block(2), "c"*7)
-        self.failUnlessEqual(br.remote_get_block_hashes(),
-                             ["1"*32, "2"*32, "3"*32, "4"*32])
-        self.failUnlessEqual(br.remote_get_share_hashes(),
-                             [(5, "5"*32), (6, "6"*32)])
+        self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
+        self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
+        self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
+class RemoteBucket:
+    def callRemote(self, methname, *args, **kwargs):
+        def _call():
+            meth = getattr(, "remote_" + methname)
+            return meth(*args, **kwargs)
+        return defer.maybeDeferred(_call)
+class BucketProxy(unittest.TestCase):
+    def make_bucket(self, name, size):
+        basedir = os.path.join("storage", "BucketProxy", name)
+        incoming = os.path.join(basedir, "tmp", "bucket")
+        final = os.path.join(basedir, "bucket")
+        fileutil.make_dirs(basedir)
+        fileutil.make_dirs(os.path.join(basedir, "tmp"))
+        bw = storageserver.BucketWriter(self, incoming, final, size)
+        rb = RemoteBucket()
+ = bw
+        return bw, rb, final
+    def bucket_writer_closed(self, bw, consumed):
+        pass
+    def test_create(self):
+        bw, rb, final = self.make_bucket("test_create", 500)
+        bp = storageserver.WriteBucketProxy(rb,
+                                            data_size=300,
+                                            segment_size=10,
+                                            num_segments=5,
+                                            num_share_hashes=3)
+        self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
+    def test_readwrite(self):
+        # Let's pretend each share has 100 bytes of data, and that there are
+        # 4 segments (25 bytes each), and 8 shares total. So the three
+        # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
+        # block_hashes) will have 4 leaves and 7 nodes each. The per-share
+        # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
+        # nodes. Furthermore, let's assume the uri_extension is 500 bytes
+        # long. That should make the whole share:
+        #
+        # 0x1c + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1406 bytes long
+        plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
+                            for i in range(7)]
+        crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
+                            for i in range(7)]
+        block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
+                        for i in range(7)]
+        share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
+                        for i in (1,9,13)]
+        uri_extension = "s" + "E"*498 + "e"
+        bw, rb, final = self.make_bucket("test_readwrite", 1406)
+        bp = storageserver.WriteBucketProxy(rb,
+                                            data_size=100,
+                                            segment_size=25,
+                                            num_segments=4,
+                                            num_share_hashes=3)
+        d = bp.start()
+        d.addCallback(lambda res: bp.put_block(0, "a"*25))
+        d.addCallback(lambda res: bp.put_block(1, "b"*25))
+        d.addCallback(lambda res: bp.put_block(2, "c"*25))
+        d.addCallback(lambda res: bp.put_block(3, "d"*25))
+        d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
+        d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
+        d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
+        d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
+        d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
+        d.addCallback(lambda res: bp.close())
+        # now read everything back
+        def _start_reading(res):
+            br = storageserver.BucketReader(final)
+            rb = RemoteBucket()
+   = br
+            rbp = storageserver.ReadBucketProxy(rb)
+            self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
+            d1 = rbp.start()
+            d1.addCallback(lambda res: rbp.get_block(0))
+            d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
+            d1.addCallback(lambda res: rbp.get_block(1))
+            d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
+            d1.addCallback(lambda res: rbp.get_block(2))
+            d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
+            d1.addCallback(lambda res: rbp.get_block(3))
+            d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*25))
+            d1.addCallback(lambda res: rbp.get_plaintext_hashes())
+            d1.addCallback(lambda res:
+                           self.failUnlessEqual(res, plaintext_hashes))
+            d1.addCallback(lambda res: rbp.get_crypttext_hashes())
+            d1.addCallback(lambda res:
+                           self.failUnlessEqual(res, crypttext_hashes))
+            d1.addCallback(lambda res: rbp.get_block_hashes())
+            d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
+            d1.addCallback(lambda res: rbp.get_share_hashes())
+            d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
+            d1.addCallback(lambda res: rbp.get_uri_extension())
+            d1.addCallback(lambda res:
+                           self.failUnlessEqual(res, uri_extension))
+            return d1
+        d.addCallback(_start_reading)
+        return d
 class Server(unittest.TestCase):
@@ -74,7 +183,7 @@ class Server(unittest.TestCase):
         canary = Referenceable()
         already,writers = ss.remote_allocate_buckets("vid", [0,1,2],
-                                                     75, 25, canary)
+                                                     75, canary)
         self.failUnlessEqual(already, set())
         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
@@ -82,19 +191,18 @@ class Server(unittest.TestCase):
         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
         for i,wb in writers.items():
-            wb.remote_put_block(0, "%25d" % i)
+            wb.remote_write(0, "%25d" % i)
         # now they should be readable
         b = ss.remote_get_buckets("vid")
         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
-        self.failUnlessEqual(b[0].remote_get_block(0),
-                             "%25d" % 0)
+        self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
         # now if we about writing again, the server should offer those three
         # buckets as already present
         already,writers = ss.remote_allocate_buckets("vid", [0,1,2,3,4],
-                                                     75, 25, canary)
+                                                     75, canary)
         self.failUnlessEqual(already, set([0,1,2]))
         self.failUnlessEqual(set(writers.keys()), set([3,4]))
@@ -103,7 +211,7 @@ class Server(unittest.TestCase):
         # upload into them a second time)
         already,writers = ss.remote_allocate_buckets("vid", [2,3,4,5],
-                                                     75, 25, canary)
+                                                     75, canary)
         self.failUnlessEqual(already, set([2,3,4]))
         self.failUnlessEqual(set(writers.keys()), set([5]))
@@ -112,35 +220,42 @@ class Server(unittest.TestCase):
         canary = Referenceable()
         already,writers = ss.remote_allocate_buckets("vid1", [0,1,2],
-                                                     25, 5, canary)
+                                                     25, canary)
         self.failUnlessEqual(len(writers), 3)
         # now the StorageServer should have 75 bytes provisionally allocated,
         # allowing only 25 more to be claimed
+        self.failUnlessEqual(len(ss._active_writers), 3)
         already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2],
-                                                       25, 5, canary)
+                                                       25, canary)
         self.failUnlessEqual(len(writers2), 1)
+        self.failUnlessEqual(len(ss._active_writers), 4)
         # we abandon the first set, so their provisional allocation should be
         # returned
         del already
         del writers
+        self.failUnlessEqual(len(ss._active_writers), 1)
         # and we close the second set, so their provisional allocation should
         # become real, long-term allocation
         for bw in writers2.values():
+            bw.remote_write(0, "a"*25)
         del already2
         del writers2
         del bw
+        self.failUnlessEqual(len(ss._active_writers), 0)
         # now there should be 25 bytes allocated, and 75 free
         already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3],
-                                                       25, 5, canary)
+                                                       25, canary)
         self.failUnlessEqual(len(writers3), 3)
+        self.failUnlessEqual(len(ss._active_writers), 3)
         del already3
         del writers3
+        self.failUnlessEqual(len(ss._active_writers), 0)
         del ss
@@ -150,5 +265,6 @@ class Server(unittest.TestCase):
         # would be more than 25 bytes and this test would need to be changed.
         ss = self.create("test_sizelimits", 100)
         already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3],
-                                                       25, 5, canary)
+                                                       25, canary)
         self.failUnlessEqual(len(writers4), 3)
+        self.failUnlessEqual(len(ss._active_writers), 3)
diff --git a/src/allmydata/ b/src/allmydata/
index 33afc0a2..5c327da4 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -24,13 +24,16 @@ class TooFullError(Exception):
 class PeerTracker:
     def __init__(self, peerid, permutedid, connection,
-                 sharesize, blocksize, crypttext_hash):
+                 sharesize, blocksize, num_segments, num_share_hashes,
+                 crypttext_hash):
         self.peerid = peerid
         self.permutedid = permutedid
         self.connection = connection # to an RIClient
         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
         self.sharesize = sharesize
         self.blocksize = blocksize
+        self.num_segments = num_segments
+        self.num_share_hashes = num_share_hashes
         self.crypttext_hash = crypttext_hash
         self._storageserver = None
@@ -54,8 +57,13 @@ class PeerTracker:
     def _got_reply(self, (alreadygot, buckets)):
         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
-        b = dict( [ (sharenum, storageserver.WriteBucketProxy(rref))
-                    for sharenum, rref in buckets.iteritems() ] )
+        b = {}
+        for sharenum, rref in buckets.iteritems():
+            bp = storageserver.WriteBucketProxy(rref, self.sharesize,
+                                                self.blocksize,
+                                                self.num_segments,
+                                                self.num_share_hashes)
+            b[sharenum] = bp
         return (alreadygot, set(b.keys()))
@@ -129,8 +137,14 @@ class FileUploader:
         # responsible for handling the data and sending out the shares.
         peers = self._client.get_permuted_peers(self._crypttext_hash)
         assert peers
+        # TODO: eek, don't pull this from here, find a better way. gross.
+        num_segments = self._encoder.uri_extension_data['num_segments']
+        from allmydata.util.mathutil import next_power_of_k
+        import math
+        num_share_hashes = max(int(math.log(next_power_of_k(self.total_shares,2),2)),1)
         trackers = [ PeerTracker(peerid, permutedid, conn,
                                  share_size, block_size,
+                                 num_segments, num_share_hashes,
                      for permutedid, peerid, conn in peers ]
         self.usable_peers = set(trackers) # this set shrinks over time