From a4d1a0b4950d738a51de1f9fc56669bd73f9d1a2 Mon Sep 17 00:00:00 2001
From: Daira Hopwood <daira@jacaranda.org>
Date: Wed, 9 Apr 2014 00:56:58 +0100
Subject: [PATCH] Changes to Bucket{Reader,Writer} and disk backend (rebased).

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
---
 src/allmydata/mutable/layout.py               |   5 +
 .../storage/backends/disk/immutable.py        | 167 +++++++--
 .../storage/backends/disk/mutable.py          | 321 ++++++++++--------
 src/allmydata/storage/bucket.py               | 161 ++++-----
 src/allmydata/storage/shares.py               |  14 -
 src/allmydata/test/common.py                  |   6 +-
 6 files changed, 389 insertions(+), 285 deletions(-)
 delete mode 100644 src/allmydata/storage/shares.py

diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py
index b938794f..7e20996f 100644
--- a/src/allmydata/mutable/layout.py
+++ b/src/allmydata/mutable/layout.py
@@ -71,6 +71,11 @@ HEADER_LENGTH = struct.calcsize(HEADER)
 OFFSETS = ">LLLLQQ"
 OFFSETS_LENGTH = struct.calcsize(OFFSETS)
 
+# our sharefiles share with a recognizable string, plus some random
+# binary data to reduce the chance that a regular text file will look
+# like a sharefile.
+MUTABLE_MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
+
 MAX_MUTABLE_SHARE_SIZE = 69105*1000*1000*1000*1000 # 69105 TB, kind of arbitrary
 
 
diff --git a/src/allmydata/storage/backends/disk/immutable.py b/src/allmydata/storage/backends/disk/immutable.py
index a8b0fc08..b7bba0db 100644
--- a/src/allmydata/storage/backends/disk/immutable.py
+++ b/src/allmydata/storage/backends/disk/immutable.py
@@ -1,15 +1,19 @@
 
-import os, struct
+import os, os.path, struct
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IShareForReading, IShareForWriting
 
 from allmydata.util import fileutil
-from allmydata.util.fileutil import get_used_space
-from allmydata.util.assertutil import precondition
-from allmydata.storage.common import UnknownImmutableContainerVersionError, \
+from allmydata.util.assertutil import precondition, _assert
+from allmydata.storage.common import si_b2a, UnknownImmutableContainerVersionError, \
      DataTooLargeError
 
 
-# Each share file (in storage/shares/$SI/$SHNUM) contains share data that
-# can be accessed by RIBucketWriter.write and RIBucketReader.read .
+# Each share file (in storage/shares/$PREFIX/$STORAGEINDEX/$SHNUM) contains
+# share data that can be accessed by RIBucketWriter.write and RIBucketReader.read .
 
 # The share file has the following layout:
 #  0x00: share file version number, four bytes, current version is 1
@@ -25,25 +29,41 @@ from allmydata.storage.common import UnknownImmutableContainerVersionError, \
 # over when the server was upgraded. All lease information is now kept in the
 # leasedb.
 
+class ImmutableDiskShare(object):
+    implements(IShareForReading, IShareForWriting)
 
-class ShareFile:
     sharetype = "immutable"
     LEASE_SIZE = struct.calcsize(">L32s32sL")
     HEADER = ">LLL"
     HEADER_SIZE = struct.calcsize(HEADER)
     DATA_OFFSET = HEADER_SIZE
 
-    def __init__(self, filename, max_size=None, create=False):
-        """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
-        precondition((max_size is not None) or (not create), max_size, create)
-        self.home = filename
-        self._max_size = max_size
-        if create:
-            # touch the file, so later callers will see that we're working on
+    def __init__(self, home, storage_index, shnum, finalhome=None, allocated_data_length=None):
+        """
+        If allocated_data_length is not None then I won't allow more than allocated_data_length
+        to be written to me.
+        If finalhome is not None (meaning that we are creating the share) then allocated_data_length
+        must not be None.
+
+        Clients should use the load_immutable_disk_share and create_immutable_disk_share
+        factory functions rather than creating instances directly.
+        """
+        precondition((allocated_data_length is not None) or (finalhome is None),
+                     allocated_data_length=allocated_data_length, finalhome=finalhome)
+        self._storage_index = storage_index
+        self._allocated_data_length = allocated_data_length
+
+        # If we are creating the share, _finalhome refers to the final path and
+        # _home to the incoming path. Otherwise, _finalhome is None.
+        self._finalhome = finalhome
+        self._home = home
+        self._shnum = shnum
+
+        if self._finalhome is not None:
+            # Touch the file, so later callers will see that we're working on
             # it. Also construct the metadata.
-            assert not os.path.exists(self.home)
-            fileutil.make_dirs(os.path.dirname(self.home))
-            f = open(self.home, 'wb')
+            _assert(not os.path.exists(self._finalhome), finalhome=self._finalhome)
+            fileutil.make_dirs(os.path.dirname(self._home))
             # The second field -- the four-byte share data length -- is no
             # longer used as of Tahoe v1.3.0, but we continue to write it in
             # there in case someone downgrades a storage server from >=
@@ -53,36 +73,99 @@ class ShareFile:
             # the largest length that can fit into the field. That way, even
             # if this does happen, the old < v1.3.0 server will still allow
             # clients to read the first part of the share.
-            f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
-            f.close()
-            self._data_length = max_size
+            fileutil.write(self._home, struct.pack(self.HEADER, 1, min(2**32-1, allocated_data_length), 0))
+            self._data_length = allocated_data_length
         else:
-            f = open(self.home, 'rb')
+            f = open(self._home, 'rb')
             try:
                 (version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE))
             finally:
                 f.close()
             if version != 1:
-                msg = "sharefile %s had version %d but we wanted 1" % \
-                      (filename, version)
+                msg = "sharefile %r had version %d but we wanted 1" % (self._home, version)
                 raise UnknownImmutableContainerVersionError(msg)
 
-            filesize = os.stat(self.home).st_size
+            filesize = os.stat(self._home).st_size
             self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE)
 
         # TODO: raise a better exception.
-        assert self._data_length >= 0, self._data_length
+        _assert(self._data_length >= 0, data_length=self._data_length)
+
+    def __repr__(self):
+        return ("<ImmutableDiskShare %s:%r at %r>"
+                % (si_b2a(self._storage_index or ""), self._shnum, self._home))
+
+    def close(self):
+        fileutil.make_dirs(os.path.dirname(self._finalhome))
+        fileutil.move_into_place(self._home, self._finalhome)
+
+        # self._home is like storage/shares/incoming/ab/abcde/4 .
+        # We try to delete the parent (.../ab/abcde) to avoid leaving
+        # these directories lying around forever, but the delete might
+        # fail if we're working on another share for the same storage
+        # index (like ab/abcde/5). The alternative approach would be to
+        # use a hierarchy of objects (PrefixHolder, BucketHolder,
+        # ShareWriter), each of which is responsible for a single
+        # directory on disk, and have them use reference counting of
+        # their children to know when they should do the rmdir. This
+        # approach is simpler, but relies on os.rmdir (used by
+        # rmdir_if_empty) refusing to delete a non-empty directory.
+        # Do *not* use fileutil.remove() here!
+        parent = os.path.dirname(self._home)
+        fileutil.rmdir_if_empty(parent)
+
+        # we also delete the grandparent (prefix) directory, .../ab ,
+        # again to avoid leaving directories lying around. This might
+        # fail if there is another bucket open that shares a prefix (like
+        # ab/abfff).
+        fileutil.rmdir_if_empty(os.path.dirname(parent))
+
+        # we leave the great-grandparent (incoming/) directory in place.
+
+        self._home = self._finalhome
+        self._finalhome = None
+        return defer.succeed(None)
 
     def get_used_space(self):
-        return get_used_space(self.home)
+        return (fileutil.get_used_space(self._finalhome) +
+                fileutil.get_used_space(self._home))
+
+    def get_storage_index(self):
+        return self._storage_index
+
+    def get_storage_index_string(self):
+        return si_b2a(self._storage_index)
+
+    def get_shnum(self):
+        return self._shnum
 
     def unlink(self):
-        os.unlink(self.home)
+        fileutil.remove(self._home)
+        return defer.succeed(None)
+
+    def get_allocated_data_length(self):
+        return self._allocated_data_length
 
     def get_size(self):
-        return os.stat(self.home).st_size
+        return os.stat(self._home).st_size
 
-    def read_share_data(self, offset, length):
+    def get_data_length(self):
+        return self._data_length
+
+    def readv(self, readv):
+        datav = []
+        f = open(self._home, 'rb')
+        try:
+            for (offset, length) in readv:
+                datav.append(self._read_share_data(f, offset, length))
+        finally:
+            f.close()
+        return defer.succeed(datav)
+
+    def _get_path(self):
+        return self._home
+
+    def _read_share_data(self, f, offset, length):
         precondition(offset >= 0)
 
         # Reads beyond the end of the data are truncated. Reads that start
@@ -91,23 +174,35 @@ class ShareFile:
         actuallength = max(0, min(length, self._data_length - offset))
         if actuallength == 0:
             return ""
-        f = open(self.home, 'rb')
+        f.seek(seekpos)
+        return f.read(actuallength)
+
+    def read_share_data(self, offset, length):
+        f = open(self._home, 'rb')
         try:
-            f.seek(seekpos)
-            return f.read(actuallength)
+            return defer.succeed(self._read_share_data(f, offset, length))
         finally:
             f.close()
 
     def write_share_data(self, offset, data):
         length = len(data)
         precondition(offset >= 0, offset)
-        if self._max_size is not None and offset+length > self._max_size:
-            raise DataTooLargeError(self._max_size, offset, length)
-        f = open(self.home, 'rb+')
+        if self._allocated_data_length is not None and offset+length > self._allocated_data_length:
+            raise DataTooLargeError(self._allocated_data_length, offset, length)
+        f = open(self._home, 'rb+')
         try:
             real_offset = self.DATA_OFFSET + offset
             f.seek(real_offset)
-            assert f.tell() == real_offset
+            _assert(f.tell() == real_offset)
             f.write(data)
+            return defer.succeed(None)
         finally:
             f.close()
+
+
+def load_immutable_disk_share(home, storage_index=None, shnum=None):
+    return ImmutableDiskShare(home, storage_index=storage_index, shnum=shnum)
+
+def create_immutable_disk_share(home, finalhome, allocated_data_length, storage_index=None, shnum=None):
+    return ImmutableDiskShare(home, finalhome=finalhome, allocated_data_length=allocated_data_length,
+                              storage_index=storage_index, shnum=shnum)
diff --git a/src/allmydata/storage/backends/disk/mutable.py b/src/allmydata/storage/backends/disk/mutable.py
index 4d834958..8eaadd25 100644
--- a/src/allmydata/storage/backends/disk/mutable.py
+++ b/src/allmydata/storage/backends/disk/mutable.py
@@ -1,18 +1,22 @@
 
 import os, struct
 
-from allmydata.interfaces import BadWriteEnablerError
-from allmydata.util import idlib, log
-from allmydata.util.assertutil import precondition
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IMutableShare, BadWriteEnablerError
+
+from allmydata.util import fileutil, idlib, log
+from allmydata.util.assertutil import precondition, _assert
 from allmydata.util.hashutil import timing_safe_compare
-from allmydata.util.fileutil import get_used_space
-from allmydata.storage.common import UnknownMutableContainerVersionError, \
+from allmydata.storage.common import si_b2a, UnknownMutableContainerVersionError, \
      DataTooLargeError
-from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
+from allmydata.storage.backends.base import testv_compare
+from allmydata.mutable.layout import MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
 
 
-# the MutableShareFile is like the ShareFile, but used for mutable data. It
-# has a different layout. See docs/mutable.txt for more details.
+# MutableDiskShare is like ImmutableDiskShare, but used for mutable data.
+# Mutable shares have a different layout. See docs/mutable.rst for more details.
 
 # #   offset    size    name
 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
@@ -20,86 +24,119 @@ from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
 # 3   52        32      write enabler
 # 4   84        8       data size (actual share data present) (a)
 # 5   92        8       offset of (8) count of extra leases (after data)
-# 6   100       368     four leases, 92 bytes each
-#                        0    4   ownerid (0 means "no lease here")
-#                        4    4   expiration timestamp
-#                        8   32   renewal token
-#                        40  32   cancel token
-#                        72  20   nodeid which accepted the tokens
+# 6   100       368     four leases, 92 bytes each (ignored)
 # 7   468       (a)     data
 # 8   ??        4       count of extra leases
-# 9   ??        n*92    extra leases
+# 9   ??        n*92    extra leases (ignored)
 
 
-# The struct module doc says that L's are 4 bytes in size., and that Q's are
+# The struct module doc says that L's are 4 bytes in size, and that Q's are
 # 8 bytes in size. Since compatibility depends upon this, double-check it.
 assert struct.calcsize(">L") == 4, struct.calcsize(">L")
 assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
 
-class MutableShareFile:
+
+class MutableDiskShare(object):
+    implements(IMutableShare)
 
     sharetype = "mutable"
     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
-    HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
+    EXTRA_LEASE_COUNT_OFFSET = DATA_LENGTH_OFFSET + 8
+    HEADER = ">32s20s32sQQ"
+    HEADER_SIZE = struct.calcsize(HEADER) # doesn't include leases
     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
-    assert LEASE_SIZE == 92
+    assert LEASE_SIZE == 92, LEASE_SIZE
     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
     assert DATA_OFFSET == 468, DATA_OFFSET
-    # our sharefiles share with a recognizable string, plus some random
-    # binary data to reduce the chance that a regular text file will look
-    # like a sharefile.
-    MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
+
+    MAGIC = MUTABLE_MAGIC
     assert len(MAGIC) == 32
     MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
-    # TODO: decide upon a policy for max share size
 
-    def __init__(self, filename, parent=None):
-        self.home = filename
-        if os.path.exists(self.home):
+    def __init__(self, home, storage_index, shnum, parent=None):
+        """
+        Clients should use the load_mutable_disk_share and create_mutable_disk_share
+        factory functions rather than creating instances directly.
+        """
+        self._storage_index = storage_index
+        self._shnum = shnum
+        self._home = home
+        if os.path.exists(self._home):
             # we don't cache anything, just check the magic
-            f = open(self.home, 'rb')
-            data = f.read(self.HEADER_SIZE)
-            (magic,
-             write_enabler_nodeid, write_enabler,
-             data_length, extra_lease_offset) = \
-             struct.unpack(">32s20s32sQQ", data)
-            if magic != self.MAGIC:
-                msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
-                      (filename, magic, self.MAGIC)
-                raise UnknownMutableContainerVersionError(msg)
+            f = open(self._home, 'rb')
+            try:
+                data = f.read(self.HEADER_SIZE)
+                (magic,
+                 _write_enabler_nodeid, _write_enabler,
+                 _data_length, _extra_lease_count_offset) = struct.unpack(self.HEADER, data)
+                if magic != self.MAGIC:
+                    msg = "sharefile %r had magic '%r' but we wanted '%r'" % \
+                          (self._home, magic, self.MAGIC)
+                    raise UnknownMutableContainerVersionError(msg)
+            finally:
+                f.close()
         self.parent = parent # for logging
 
     def log(self, *args, **kwargs):
-        return self.parent.log(*args, **kwargs)
+        if self.parent:
+            return self.parent.log(*args, **kwargs)
 
-    def create(self, my_nodeid, write_enabler):
-        assert not os.path.exists(self.home)
+    def create(self, serverid, write_enabler):
+        _assert(not os.path.exists(self._home), "%r already exists and should not" % (self._home,))
         data_length = 0
-        extra_lease_offset = (self.HEADER_SIZE
-                              + 4 * self.LEASE_SIZE
-                              + data_length)
-        assert extra_lease_offset == self.DATA_OFFSET # true at creation
+        extra_lease_count_offset = (self.HEADER_SIZE
+                                    + 4 * self.LEASE_SIZE
+                                    + data_length)
+        assert extra_lease_count_offset == self.DATA_OFFSET # true at creation
         num_extra_leases = 0
-        f = open(self.home, 'wb')
-        header = struct.pack(">32s20s32sQQ",
-                             self.MAGIC, my_nodeid, write_enabler,
-                             data_length, extra_lease_offset,
-                             )
-        leases = ("\x00"*self.LEASE_SIZE) * 4
-        f.write(header + leases)
-        # data goes here, empty after creation
-        f.write(struct.pack(">L", num_extra_leases))
-        # extra leases go here, none at creation
-        f.close()
+        f = open(self._home, 'wb')
+        try:
+            header = struct.pack(self.HEADER,
+                                 self.MAGIC, serverid, write_enabler,
+                                 data_length, extra_lease_count_offset,
+                                 )
+            leases = ("\x00"*self.LEASE_SIZE) * 4
+            f.write(header + leases)
+            # data goes here, empty after creation
+            f.write(struct.pack(">L", num_extra_leases))
+            # extra leases go here, none at creation
+        finally:
+            f.close()
+        return self
+
+    def __repr__(self):
+        return ("<MutableDiskShare %s:%r at %r>"
+                % (si_b2a(self._storage_index or ""), self._shnum, self._home))
+
+    def get_size(self):
+        return os.stat(self._home).st_size
+
+    def get_data_length(self):
+        f = open(self._home, 'rb')
+        try:
+            data_length = self._read_data_length(f)
+        finally:
+            f.close()
+        return data_length
 
     def get_used_space(self):
-        return get_used_space(self.home)
+        return fileutil.get_used_space(self._home)
+
+    def get_storage_index(self):
+        return self._storage_index
+
+    def get_storage_index_string(self):
+        return si_b2a(self._storage_index)
+
+    def get_shnum(self):
+        return self._shnum
 
     def unlink(self):
-        os.unlink(self.home)
+        fileutil.remove(self._home)
+        return defer.succeed(None)
 
-    def get_size(self):
-        return os.stat(self.home).st_size
+    def _get_path(self):
+        return self._home
 
     def _read_data_length(self, f):
         f.seek(self.DATA_LENGTH_OFFSET)
@@ -107,34 +144,36 @@ class MutableShareFile:
         return data_length
 
     def _read_container_size(self, f):
-        f.seek(self.DATA_LENGTH_OFFSET + 8)
-        (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
-        return extra_lease_offset - self.DATA_OFFSET
+        f.seek(self.EXTRA_LEASE_COUNT_OFFSET)
+        (extra_lease_count_offset,) = struct.unpack(">Q", f.read(8))
+        return extra_lease_count_offset - self.DATA_OFFSET
 
     def _write_data_length(self, f, data_length):
-        extra_lease_offset = self.DATA_OFFSET + data_length
+        extra_lease_count_offset = self.DATA_OFFSET + data_length
         f.seek(self.DATA_LENGTH_OFFSET)
-        f.write(struct.pack(">QQ", data_length, extra_lease_offset))
-        f.seek(extra_lease_offset)
+        f.write(struct.pack(">QQ", data_length, extra_lease_count_offset))
+        f.seek(extra_lease_count_offset)
         f.write(struct.pack(">L", 0))
 
     def _read_share_data(self, f, offset, length):
-        precondition(offset >= 0)
+        precondition(offset >= 0, offset=offset)
         data_length = self._read_data_length(f)
-        if offset+length > data_length:
+        if offset + length > data_length:
             # reads beyond the end of the data are truncated. Reads that
             # start beyond the end of the data return an empty string.
-            length = max(0, data_length-offset)
+            length = max(0, data_length - offset)
         if length == 0:
             return ""
-        precondition(offset+length <= data_length)
+        precondition(offset + length <= data_length)
         f.seek(self.DATA_OFFSET+offset)
         data = f.read(length)
         return data
 
     def _write_share_data(self, f, offset, data):
         length = len(data)
-        precondition(offset >= 0)
+        precondition(offset >= 0, offset=offset)
+        precondition(offset + length < self.MAX_SIZE, offset=offset, length=length)
+
         data_length = self._read_data_length(f)
 
         if offset+length >= data_length:
@@ -148,16 +187,16 @@ class MutableShareFile:
 
             # Fill any newly exposed empty space with 0's.
             if offset > data_length:
-                f.seek(self.DATA_OFFSET+data_length)
+                f.seek(self.DATA_OFFSET + data_length)
                 f.write('\x00'*(offset - data_length))
                 f.flush()
 
-            new_data_length = offset+length
+            new_data_length = offset + length
             self._write_data_length(f, new_data_length)
             # an interrupt here will result in a corrupted share
 
         # now all that's left to do is write out their data
-        f.seek(self.DATA_OFFSET+offset)
+        f.seek(self.DATA_OFFSET + offset)
         f.write(data)
         return
 
@@ -166,90 +205,84 @@ class MutableShareFile:
         data = f.read(self.HEADER_SIZE)
         (magic,
          write_enabler_nodeid, write_enabler,
-         data_length, extra_least_offset) = \
-         struct.unpack(">32s20s32sQQ", data)
+         _data_length, _extra_lease_count_offset) = struct.unpack(self.HEADER, data)
         assert magic == self.MAGIC
         return (write_enabler, write_enabler_nodeid)
 
     def readv(self, readv):
         datav = []
-        f = open(self.home, 'rb')
-        for (offset, length) in readv:
-            datav.append(self._read_share_data(f, offset, length))
-        f.close()
-        return datav
-
-    def check_write_enabler(self, write_enabler, si_s):
-        f = open(self.home, 'rb+')
-        (real_write_enabler, write_enabler_nodeid) = \
-                             self._read_write_enabler_and_nodeid(f)
-        f.close()
+        f = open(self._home, 'rb')
+        try:
+            for (offset, length) in readv:
+                datav.append(self._read_share_data(f, offset, length))
+        finally:
+            f.close()
+        return defer.succeed(datav)
+
+    def check_write_enabler(self, write_enabler):
+        f = open(self._home, 'rb+')
+        try:
+            (real_write_enabler, write_enabler_nodeid) = self._read_write_enabler_and_nodeid(f)
+        finally:
+            f.close()
         # avoid a timing attack
-        #if write_enabler != real_write_enabler:
         if not timing_safe_compare(write_enabler, real_write_enabler):
             # accomodate share migration by reporting the nodeid used for the
             # old write enabler.
-            self.log(format="bad write enabler on SI %(si)s,"
-                     " recorded by nodeid %(nodeid)s",
-                     facility="tahoe.storage",
-                     level=log.WEIRD, umid="cE1eBQ",
-                     si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
-            msg = "The write enabler was recorded by nodeid '%s'." % \
-                  (idlib.nodeid_b2a(write_enabler_nodeid),)
-            raise BadWriteEnablerError(msg)
+            def _bad_write_enabler():
+                nodeid_s = idlib.nodeid_b2a(write_enabler_nodeid)
+                self.log(format="bad write enabler on SI %(si)s,"
+                         " recorded by nodeid %(nodeid)s",
+                         facility="tahoe.storage",
+                         level=log.WEIRD, umid="cE1eBQ",
+                         si=self.get_storage_index_string(),
+                         nodeid=nodeid_s)
+                raise BadWriteEnablerError("The write enabler was recorded by nodeid '%s'."
+                                           % (nodeid_s,))
+            return defer.execute(_bad_write_enabler)
+        return defer.succeed(None)
 
     def check_testv(self, testv):
         test_good = True
-        f = open(self.home, 'rb+')
-        for (offset, length, operator, specimen) in testv:
-            data = self._read_share_data(f, offset, length)
-            if not testv_compare(data, operator, specimen):
-                test_good = False
-                break
-        f.close()
-        return test_good
+        f = open(self._home, 'rb+')
+        try:
+            for (offset, length, operator, specimen) in testv:
+                data = self._read_share_data(f, offset, length)
+                if not testv_compare(data, operator, specimen):
+                    test_good = False
+                    break
+        finally:
+            f.close()
+        return defer.succeed(test_good)
 
     def writev(self, datav, new_length):
-        f = open(self.home, 'rb+')
-        for (offset, data) in datav:
-            self._write_share_data(f, offset, data)
-        if new_length is not None:
-            cur_length = self._read_data_length(f)
-            if new_length < cur_length:
-                self._write_data_length(f, new_length)
-                # TODO: shrink the share file.
-        f.close()
-
-def testv_compare(a, op, b):
-    assert op in ("lt", "le", "eq", "ne", "ge", "gt")
-    if op == "lt":
-        return a < b
-    if op == "le":
-        return a <= b
-    if op == "eq":
-        return a == b
-    if op == "ne":
-        return a != b
-    if op == "ge":
-        return a >= b
-    if op == "gt":
-        return a > b
-    # never reached
-
-class EmptyShare:
+        precondition(new_length is None or new_length >= 0, new_length=new_length)
 
-    def check_testv(self, testv):
-        test_good = True
-        for (offset, length, operator, specimen) in testv:
-            data = ""
-            if not testv_compare(data, operator, specimen):
-                test_good = False
-                break
-        return test_good
-
-def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
-    ms = MutableShareFile(filename, parent)
-    ms.create(my_nodeid, write_enabler)
-    del ms
-    return MutableShareFile(filename, parent)
+        for (offset, data) in datav:
+            precondition(offset >= 0, offset=offset)
+            if offset + len(data) > self.MAX_SIZE:
+                raise DataTooLargeError()
 
+        f = open(self._home, 'rb+')
+        try:
+            for (offset, data) in datav:
+                self._write_share_data(f, offset, data)
+            if new_length is not None:
+                cur_length = self._read_data_length(f)
+                if new_length < cur_length:
+                    self._write_data_length(f, new_length)
+                    # TODO: shrink the share file.
+        finally:
+            f.close()
+        return defer.succeed(None)
+
+    def close(self):
+        return defer.succeed(None)
+
+
+def load_mutable_disk_share(home, storage_index=None, shnum=None, parent=None):
+    return MutableDiskShare(home, storage_index, shnum, parent)
+
+def create_mutable_disk_share(home, serverid, write_enabler, storage_index=None, shnum=None, parent=None):
+    ms = MutableDiskShare(home, storage_index, shnum, parent)
+    return ms.create(serverid, write_enabler)
diff --git a/src/allmydata/storage/bucket.py b/src/allmydata/storage/bucket.py
index 1cf99cbf..e63ad462 100644
--- a/src/allmydata/storage/bucket.py
+++ b/src/allmydata/storage/bucket.py
@@ -1,146 +1,131 @@
 
-import os, time
+import time
 
 from foolscap.api import Referenceable
+from twisted.internet import defer
 
 from zope.interface import implements
 from allmydata.interfaces import RIBucketWriter, RIBucketReader
 
-from allmydata.util import base32, fileutil, log
-from allmydata.util.fileutil import get_used_space
+from allmydata.util import base32, log
 from allmydata.util.assertutil import precondition
-from allmydata.storage.backends.disk.immutable import ShareFile
 from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE
 
 
-
 class BucketWriter(Referenceable):
     implements(RIBucketWriter)
 
-    def __init__(self, ss, account, storage_index, shnum,
-                 incominghome, finalhome, max_size, canary):
-        self.ss = ss
-        self.incominghome = incominghome
-        self.finalhome = finalhome
-        self._max_size = max_size # don't allow the client to write more than this
+    def __init__(self, account, share, canary):
+        self.ss = account.server
         self._account = account
-        self._storage_index = storage_index
-        self._shnum = shnum
+        self._share = share
         self._canary = canary
         self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
         self.closed = False
         self.throw_out_all_data = False
-        self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
-        self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE)
+
+        self._account.add_share(share.get_storage_index(), share.get_shnum(),
+                                share.get_allocated_data_length(), SHARETYPE_IMMUTABLE)
 
     def allocated_size(self):
-        return self._max_size
+        return self._share.get_allocated_data_length()
+
+    def _add_latency(self, res, name, start):
+        self.ss.add_latency(name, time.time() - start)
+        self.ss.count(name)
+        return res
 
     def remote_write(self, offset, data):
         start = time.time()
         precondition(not self.closed)
         if self.throw_out_all_data:
-            return
-        self._sharefile.write_share_data(offset, data)
-        self.ss.add_latency("write", time.time() - start)
-        self.ss.count("write")
+            return defer.succeed(None)
+        d = self._share.write_share_data(offset, data)
+        d.addBoth(self._add_latency, "write", start)
+        return d
 
     def remote_close(self):
         precondition(not self.closed)
         start = time.time()
 
-        fileutil.make_dirs(os.path.dirname(self.finalhome))
-        fileutil.rename(self.incominghome, self.finalhome)
-        try:
-            # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
-            # We try to delete the parent (.../ab/abcde) to avoid leaving
-            # these directories lying around forever, but the delete might
-            # fail if we're working on another share for the same storage
-            # index (like ab/abcde/5). The alternative approach would be to
-            # use a hierarchy of objects (PrefixHolder, BucketHolder,
-            # ShareWriter), each of which is responsible for a single
-            # directory on disk, and have them use reference counting of
-            # their children to know when they should do the rmdir. This
-            # approach is simpler, but relies on os.rmdir refusing to delete
-            # a non-empty directory. Do *not* use fileutil.rm_dir() here!
-            os.rmdir(os.path.dirname(self.incominghome))
-            # we also delete the grandparent (prefix) directory, .../ab ,
-            # again to avoid leaving directories lying around. This might
-            # fail if there is another bucket open that shares a prefix (like
-            # ab/abfff).
-            os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
-            # we leave the great-grandparent (incoming/) directory in place.
-        except EnvironmentError:
-            # ignore the "can't rmdir because the directory is not empty"
-            # exceptions, those are normal consequences of the
-            # above-mentioned conditions.
-            pass
-        self._sharefile = None
-        self.closed = True
-        self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-
-        filelen = get_used_space(self.finalhome)
-        self.ss.bucket_writer_closed(self, filelen)
-        self._account.add_or_renew_default_lease(self._storage_index, self._shnum)
-        self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen)
-        self.ss.add_latency("close", time.time() - start)
-        self.ss.count("close")
+        d = defer.succeed(None)
+        d.addCallback(lambda ign: self._share.close())
+        d.addCallback(lambda ign: self._share.get_used_space())
+        def _got_used_space(used_space):
+            storage_index = self._share.get_storage_index()
+            shnum = self._share.get_shnum()
+            self._share = None
+            self.closed = True
+            self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+
+            self.ss.bucket_writer_closed(self, used_space)
+            self._account.add_or_renew_default_lease(storage_index, shnum)
+            self._account.mark_share_as_stable(storage_index, shnum, used_space)
+        d.addCallback(_got_used_space)
+        d.addBoth(self._add_latency, "close", start)
+        return d
 
     def _disconnected(self):
         if not self.closed:
-            self._abort()
+            return self._abort()
+        return defer.succeed(None)
 
     def remote_abort(self):
-        log.msg("storage: aborting sharefile %s" % self.incominghome,
+        log.msg("storage: aborting write to share %r" % self._share,
                 facility="tahoe.storage", level=log.UNUSUAL)
         if not self.closed:
             self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-        self._abort()
-        self.ss.count("abort")
+        d = self._abort()
+        def _count(ign):
+            self.ss.count("abort")
+        d.addBoth(_count)
+        return d
 
     def _abort(self):
+        d = defer.succeed(None)
         if self.closed:
-            return
-
-        os.remove(self.incominghome)
-        # if we were the last share to be moved, remove the incoming/
-        # directory that was our parent
-        parentdir = os.path.split(self.incominghome)[0]
-        if not os.listdir(parentdir):
-            os.rmdir(parentdir)
-        self._sharefile = None
-        self._account.remove_share_and_leases(self._storage_index, self._shnum)
+            return d
+        d.addCallback(lambda ign: self._share.unlink())
+        def _unlinked(ign):
+            self._share = None
 
-        # We are now considered closed for further writing. We must tell
-        # the storage server about this so that it stops expecting us to
-        # use the space it allocated for us earlier.
-        self.closed = True
-        self.ss.bucket_writer_closed(self, 0)
+            # We are now considered closed for further writing. We must tell
+            # the storage server about this so that it stops expecting us to
+            # use the space it allocated for us earlier.
+            self.closed = True
+            self.ss.bucket_writer_closed(self, 0)
+        d.addCallback(_unlinked)
+        return d
 
 
 class BucketReader(Referenceable):
     implements(RIBucketReader)
 
-    def __init__(self, ss, sharefname, storage_index=None, shnum=None):
-        self.ss = ss
-        self._share_file = ShareFile(sharefname)
-        self.storage_index = storage_index
-        self.shnum = shnum
+    def __init__(self, account, share):
+        self.ss = account.server
+        self._account = account
+        self._share = share
+        self.storage_index = share.get_storage_index()
+        self.shnum = share.get_shnum()
 
     def __repr__(self):
         return "<%s %s %s>" % (self.__class__.__name__,
                                base32.b2a_l(self.storage_index[:8], 60),
                                self.shnum)
 
+    def _add_latency(self, res, name, start):
+        self.ss.add_latency(name, time.time() - start)
+        self.ss.count(name)
+        return res
+
     def remote_read(self, offset, length):
         start = time.time()
-        data = self._share_file.read_share_data(offset, length)
-        self.ss.add_latency("read", time.time() - start)
-        self.ss.count("read")
-        return data
+        d = self._share.read_share_data(offset, length)
+        d.addBoth(self._add_latency, "read", start)
+        return d
 
     def remote_advise_corrupt_share(self, reason):
-        return self.ss.client_advise_corrupt_share("immutable",
-                                                   self.storage_index,
-                                                   self.shnum,
-                                                   reason)
+        return self._account.remote_advise_corrupt_share("immutable",
+                                                         self.storage_index,
+                                                         self.shnum,
+                                                         reason)
diff --git a/src/allmydata/storage/shares.py b/src/allmydata/storage/shares.py
deleted file mode 100644
index bcdf98c4..00000000
--- a/src/allmydata/storage/shares.py
+++ /dev/null
@@ -1,14 +0,0 @@
-#! /usr/bin/python
-
-from allmydata.storage.backends.disk.mutable import MutableShareFile
-from allmydata.storage.backends.disk.immutable import ShareFile
-
-def get_share_file(filename):
-    f = open(filename, "rb")
-    prefix = f.read(32)
-    f.close()
-    if prefix == MutableShareFile.MAGIC:
-        return MutableShareFile(filename)
-    # otherwise assume it's immutable
-    return ShareFile(filename)
-
diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py
index fe4d2aa0..0d581755 100644
--- a/src/allmydata/test/common.py
+++ b/src/allmydata/test/common.py
@@ -17,7 +17,7 @@ from allmydata.check_results import CheckResults, CheckAndRepairResults, \
 from allmydata.storage_client import StubServer
 from allmydata.mutable.layout import unpack_header
 from allmydata.mutable.publish import MutableData
-from allmydata.storage.backends.disk.mutable import MutableShareFile
+from allmydata.storage.backends.disk.mutable import MutableDiskShare
 from allmydata.util import hashutil, log, fileutil, pollmixin
 from allmydata.util.assertutil import precondition
 from allmydata.util.consumer import download_to_data
@@ -1307,8 +1307,8 @@ def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
 
 def _corrupt_mutable_share_data(data, debug=False):
     prefix = data[:32]
-    assert prefix == MutableShareFile.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
-    data_offset = MutableShareFile.DATA_OFFSET
+    assert prefix == MutableDiskShare.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableDiskShare.MAGIC)
+    data_offset = MutableDiskShare.DATA_OFFSET
     sharetype = data[data_offset:data_offset+1]
     assert sharetype == "\x00", "non-SDMF mutable shares not supported"
     (version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
-- 
2.45.2