Move code around and add new directories for cloud backend merge.
authorDavid-Sarah Hopwood <david-sarah@jacaranda.org>
Thu, 22 Nov 2012 05:33:35 +0000 (05:33 +0000)
committerDaira Hopwood <daira@jacaranda.org>
Fri, 17 Apr 2015 21:31:02 +0000 (22:31 +0100)
Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
16 files changed:
setup.py
src/allmydata/scripts/debug.py
src/allmydata/storage/backends/__init__.py [new file with mode: 0644]
src/allmydata/storage/backends/cloud/__init__.py [new file with mode: 0644]
src/allmydata/storage/backends/cloud/s3/__init__.py [new file with mode: 0644]
src/allmydata/storage/backends/disk/__init__.py [new file with mode: 0644]
src/allmydata/storage/backends/disk/immutable.py [new file with mode: 0644]
src/allmydata/storage/backends/disk/mutable.py [new file with mode: 0644]
src/allmydata/storage/backends/null/__init__.py [new file with mode: 0644]
src/allmydata/storage/immutable.py [deleted file]
src/allmydata/storage/mutable.py [deleted file]
src/allmydata/storage/server.py
src/allmydata/storage/shares.py
src/allmydata/test/common.py
src/allmydata/test/test_storage.py
src/allmydata/test/test_system.py

index d6fe60b9a41c34e7aa69e21b9183dbd441d72d44..ffa7ae9389603965c440d5ae328c52f7ff06a455 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -455,6 +455,11 @@ setup(name=APPNAME,
                 'allmydata.mutable',
                 'allmydata.scripts',
                 'allmydata.storage',
+                'allmydata.storage.backends',
+                'allmydata.storage.backends.cloud',
+                'allmydata.storage.backends.cloud.s3',
+                'allmydata.storage.backends.disk',
+                'allmydata.storage.backends.null',
                 'allmydata.test',
                 'allmydata.util',
                 'allmydata.web',
index 54dc1df229e758baeecb86ba573da8c3627efce6..79926a35accf956750f7ea4971efd62e1f8b0d9b 100644 (file)
@@ -35,7 +35,7 @@ verify-cap for the file that uses the share.
         self['filename'] = argv_to_abspath(filename)
 
 def dump_share(options):
-    from allmydata.storage.mutable import MutableShareFile
+    from allmydata.storage.backends.disk.mutable import MutableShareFile
     from allmydata.util.encodingutil import quote_output
 
     out = options.stdout
@@ -52,7 +52,7 @@ def dump_share(options):
     return dump_immutable_share(options)
 
 def dump_immutable_share(options):
-    from allmydata.storage.immutable import ShareFile
+    from allmydata.storage.backends.disk.immutable import ShareFile
 
     out = options.stdout
     f = ShareFile(options['filename'])
@@ -153,7 +153,7 @@ def format_expiration_time(expiration_time):
 
 
 def dump_mutable_share(options):
-    from allmydata.storage.mutable import MutableShareFile
+    from allmydata.storage.backends.disk.mutable import MutableShareFile
     from allmydata.util import base32, idlib
     out = options.stdout
     m = MutableShareFile(options['filename'])
@@ -635,8 +635,8 @@ def call(c, *args, **kwargs):
 
 def describe_share(abs_sharefile, si_s, shnum_s, now, out):
     from allmydata import uri
-    from allmydata.storage.mutable import MutableShareFile
-    from allmydata.storage.immutable import ShareFile
+    from allmydata.storage.backends.disk.mutable import MutableShareFile
+    from allmydata.storage.backends.disk.immutable import ShareFile
     from allmydata.mutable.layout import unpack_share
     from allmydata.mutable.common import NeedMoreDataError
     from allmydata.immutable.layout import ReadBucketProxy
@@ -829,8 +829,8 @@ Obviously, this command should not be used in normal operation.
 
 def corrupt_share(options):
     import random
-    from allmydata.storage.mutable import MutableShareFile
-    from allmydata.storage.immutable import ShareFile
+    from allmydata.storage.backends.disk.mutable import MutableShareFile
+    from allmydata.storage.backends.disk.immutable import ShareFile
     from allmydata.mutable.layout import unpack_header
     from allmydata.immutable.layout import ReadBucketProxy
     out = options.stdout
diff --git a/src/allmydata/storage/backends/__init__.py b/src/allmydata/storage/backends/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/allmydata/storage/backends/cloud/__init__.py b/src/allmydata/storage/backends/cloud/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/allmydata/storage/backends/cloud/s3/__init__.py b/src/allmydata/storage/backends/cloud/s3/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/allmydata/storage/backends/disk/__init__.py b/src/allmydata/storage/backends/disk/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/allmydata/storage/backends/disk/immutable.py b/src/allmydata/storage/backends/disk/immutable.py
new file mode 100644 (file)
index 0000000..780b04a
--- /dev/null
@@ -0,0 +1,250 @@
+
+import os, struct, time
+
+from foolscap.api import Referenceable
+
+from zope.interface import implements
+from allmydata.interfaces import RIBucketWriter, RIBucketReader
+from allmydata.util import base32, fileutil, log
+from allmydata.util.fileutil import get_used_space
+from allmydata.util.assertutil import precondition
+from allmydata.storage.common import UnknownImmutableContainerVersionError, \
+     DataTooLargeError
+from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE
+
+
+# Each share file (in storage/shares/$SI/$SHNUM) contains share data that
+# can be accessed by RIBucketWriter.write and RIBucketReader.read .
+
+# The share file has the following layout:
+#  0x00: share file version number, four bytes, current version is 1
+#  0x04: share data length, four bytes big-endian     # Footnote 1
+#  0x08: number of leases, four bytes big-endian = N  # Footnote 2
+#  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
+#  filesize - 72*N: leases (ignored). Each lease is 72 bytes.
+
+# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers.
+
+# Footnote 2: as of Tahoe v1.11.0 this field is not used by storage servers.
+# New shares will have a 0 here. Old shares will have whatever value was left
+# over when the server was upgraded. All lease information is now kept in the
+# leasedb.
+
+
+class ShareFile:
+    sharetype = "immutable"
+    LEASE_SIZE = struct.calcsize(">L32s32sL")
+    HEADER = ">LLL"
+    HEADER_SIZE = struct.calcsize(HEADER)
+    DATA_OFFSET = HEADER_SIZE
+
+    def __init__(self, filename, max_size=None, create=False):
+        """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
+        precondition((max_size is not None) or (not create), max_size, create)
+        self.home = filename
+        self._max_size = max_size
+        if create:
+            # touch the file, so later callers will see that we're working on
+            # it. Also construct the metadata.
+            assert not os.path.exists(self.home)
+            fileutil.make_dirs(os.path.dirname(self.home))
+            f = open(self.home, 'wb')
+            # The second field -- the four-byte share data length -- is no
+            # longer used as of Tahoe v1.3.0, but we continue to write it in
+            # there in case someone downgrades a storage server from >=
+            # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
+            # server to another, etc. We do saturation -- a share data length
+            # larger than 2**32-1 (what can fit into the field) is marked as
+            # the largest length that can fit into the field. That way, even
+            # if this does happen, the old < v1.3.0 server will still allow
+            # clients to read the first part of the share.
+            f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
+            f.close()
+            self._data_length = max_size
+        else:
+            f = open(self.home, 'rb')
+            try:
+                (version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE))
+            finally:
+                f.close()
+            if version != 1:
+                msg = "sharefile %s had version %d but we wanted 1" % \
+                      (filename, version)
+                raise UnknownImmutableContainerVersionError(msg)
+
+            filesize = os.stat(self.home).st_size
+            self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE)
+
+        # TODO: raise a better exception.
+        assert self._data_length >= 0, self._data_length
+
+    def get_used_space(self):
+        return get_used_space(self.home)
+
+    def unlink(self):
+        os.unlink(self.home)
+
+    def get_size(self):
+        return os.stat(self.home).st_size
+
+    def read_share_data(self, offset, length):
+        precondition(offset >= 0)
+
+        # Reads beyond the end of the data are truncated. Reads that start
+        # beyond the end of the data return an empty string.
+        seekpos = self.DATA_OFFSET + offset
+        actuallength = max(0, min(length, self._data_length - offset))
+        if actuallength == 0:
+            return ""
+        f = open(self.home, 'rb')
+        try:
+            f.seek(seekpos)
+            return f.read(actuallength)
+        finally:
+            f.close()
+
+    def write_share_data(self, offset, data):
+        length = len(data)
+        precondition(offset >= 0, offset)
+        if self._max_size is not None and offset+length > self._max_size:
+            raise DataTooLargeError(self._max_size, offset, length)
+        f = open(self.home, 'rb+')
+        try:
+            real_offset = self.DATA_OFFSET + offset
+            f.seek(real_offset)
+            assert f.tell() == real_offset
+            f.write(data)
+        finally:
+            f.close()
+
+
+class BucketWriter(Referenceable):
+    implements(RIBucketWriter)
+
+    def __init__(self, ss, account, storage_index, shnum,
+                 incominghome, finalhome, max_size, canary):
+        self.ss = ss
+        self.incominghome = incominghome
+        self.finalhome = finalhome
+        self._max_size = max_size # don't allow the client to write more than this
+        self._account = account
+        self._storage_index = storage_index
+        self._shnum = shnum
+        self._canary = canary
+        self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
+        self.closed = False
+        self.throw_out_all_data = False
+        self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
+        self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE)
+
+    def allocated_size(self):
+        return self._max_size
+
+    def remote_write(self, offset, data):
+        start = time.time()
+        precondition(not self.closed)
+        if self.throw_out_all_data:
+            return
+        self._sharefile.write_share_data(offset, data)
+        self.ss.add_latency("write", time.time() - start)
+        self.ss.count("write")
+
+    def remote_close(self):
+        precondition(not self.closed)
+        start = time.time()
+
+        fileutil.make_dirs(os.path.dirname(self.finalhome))
+        fileutil.rename(self.incominghome, self.finalhome)
+        try:
+            # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
+            # We try to delete the parent (.../ab/abcde) to avoid leaving
+            # these directories lying around forever, but the delete might
+            # fail if we're working on another share for the same storage
+            # index (like ab/abcde/5). The alternative approach would be to
+            # use a hierarchy of objects (PrefixHolder, BucketHolder,
+            # ShareWriter), each of which is responsible for a single
+            # directory on disk, and have them use reference counting of
+            # their children to know when they should do the rmdir. This
+            # approach is simpler, but relies on os.rmdir refusing to delete
+            # a non-empty directory. Do *not* use fileutil.rm_dir() here!
+            os.rmdir(os.path.dirname(self.incominghome))
+            # we also delete the grandparent (prefix) directory, .../ab ,
+            # again to avoid leaving directories lying around. This might
+            # fail if there is another bucket open that shares a prefix (like
+            # ab/abfff).
+            os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
+            # we leave the great-grandparent (incoming/) directory in place.
+        except EnvironmentError:
+            # ignore the "can't rmdir because the directory is not empty"
+            # exceptions, those are normal consequences of the
+            # above-mentioned conditions.
+            pass
+        self._sharefile = None
+        self.closed = True
+        self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+
+        filelen = get_used_space(self.finalhome)
+        self.ss.bucket_writer_closed(self, filelen)
+        self._account.add_or_renew_default_lease(self._storage_index, self._shnum)
+        self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen)
+        self.ss.add_latency("close", time.time() - start)
+        self.ss.count("close")
+
+    def _disconnected(self):
+        if not self.closed:
+            self._abort()
+
+    def remote_abort(self):
+        log.msg("storage: aborting sharefile %s" % self.incominghome,
+                facility="tahoe.storage", level=log.UNUSUAL)
+        if not self.closed:
+            self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+        self._abort()
+        self.ss.count("abort")
+
+    def _abort(self):
+        if self.closed:
+            return
+
+        os.remove(self.incominghome)
+        # if we were the last share to be moved, remove the incoming/
+        # directory that was our parent
+        parentdir = os.path.split(self.incominghome)[0]
+        if not os.listdir(parentdir):
+            os.rmdir(parentdir)
+        self._sharefile = None
+        self._account.remove_share_and_leases(self._storage_index, self._shnum)
+
+        # We are now considered closed for further writing. We must tell
+        # the storage server about this so that it stops expecting us to
+        # use the space it allocated for us earlier.
+        self.closed = True
+        self.ss.bucket_writer_closed(self, 0)
+
+
+class BucketReader(Referenceable):
+    implements(RIBucketReader)
+
+    def __init__(self, ss, sharefname, storage_index=None, shnum=None):
+        self.ss = ss
+        self._share_file = ShareFile(sharefname)
+        self.storage_index = storage_index
+        self.shnum = shnum
+
+    def __repr__(self):
+        return "<%s %s %s>" % (self.__class__.__name__,
+                               base32.b2a_l(self.storage_index[:8], 60),
+                               self.shnum)
+
+    def remote_read(self, offset, length):
+        start = time.time()
+        data = self._share_file.read_share_data(offset, length)
+        self.ss.add_latency("read", time.time() - start)
+        self.ss.count("read")
+        return data
+
+    def remote_advise_corrupt_share(self, reason):
+        return self.ss.client_advise_corrupt_share("immutable",
+                                                   self.storage_index,
+                                                   self.shnum,
+                                                   reason)
diff --git a/src/allmydata/storage/backends/disk/mutable.py b/src/allmydata/storage/backends/disk/mutable.py
new file mode 100644 (file)
index 0000000..4d83495
--- /dev/null
@@ -0,0 +1,255 @@
+
+import os, struct
+
+from allmydata.interfaces import BadWriteEnablerError
+from allmydata.util import idlib, log
+from allmydata.util.assertutil import precondition
+from allmydata.util.hashutil import timing_safe_compare
+from allmydata.util.fileutil import get_used_space
+from allmydata.storage.common import UnknownMutableContainerVersionError, \
+     DataTooLargeError
+from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
+
+
+# the MutableShareFile is like the ShareFile, but used for mutable data. It
+# has a different layout. See docs/mutable.txt for more details.
+
+# #   offset    size    name
+# 1   0         32      magic verstr "tahoe mutable container v1" plus binary
+# 2   32        20      write enabler's nodeid
+# 3   52        32      write enabler
+# 4   84        8       data size (actual share data present) (a)
+# 5   92        8       offset of (8) count of extra leases (after data)
+# 6   100       368     four leases, 92 bytes each
+#                        0    4   ownerid (0 means "no lease here")
+#                        4    4   expiration timestamp
+#                        8   32   renewal token
+#                        40  32   cancel token
+#                        72  20   nodeid which accepted the tokens
+# 7   468       (a)     data
+# 8   ??        4       count of extra leases
+# 9   ??        n*92    extra leases
+
+
+# The struct module doc says that L's are 4 bytes in size., and that Q's are
+# 8 bytes in size. Since compatibility depends upon this, double-check it.
+assert struct.calcsize(">L") == 4, struct.calcsize(">L")
+assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
+
+class MutableShareFile:
+
+    sharetype = "mutable"
+    DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
+    HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
+    LEASE_SIZE = struct.calcsize(">LL32s32s20s")
+    assert LEASE_SIZE == 92
+    DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
+    assert DATA_OFFSET == 468, DATA_OFFSET
+    # our sharefiles share with a recognizable string, plus some random
+    # binary data to reduce the chance that a regular text file will look
+    # like a sharefile.
+    MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
+    assert len(MAGIC) == 32
+    MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
+    # TODO: decide upon a policy for max share size
+
+    def __init__(self, filename, parent=None):
+        self.home = filename
+        if os.path.exists(self.home):
+            # we don't cache anything, just check the magic
+            f = open(self.home, 'rb')
+            data = f.read(self.HEADER_SIZE)
+            (magic,
+             write_enabler_nodeid, write_enabler,
+             data_length, extra_lease_offset) = \
+             struct.unpack(">32s20s32sQQ", data)
+            if magic != self.MAGIC:
+                msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
+                      (filename, magic, self.MAGIC)
+                raise UnknownMutableContainerVersionError(msg)
+        self.parent = parent # for logging
+
+    def log(self, *args, **kwargs):
+        return self.parent.log(*args, **kwargs)
+
+    def create(self, my_nodeid, write_enabler):
+        assert not os.path.exists(self.home)
+        data_length = 0
+        extra_lease_offset = (self.HEADER_SIZE
+                              + 4 * self.LEASE_SIZE
+                              + data_length)
+        assert extra_lease_offset == self.DATA_OFFSET # true at creation
+        num_extra_leases = 0
+        f = open(self.home, 'wb')
+        header = struct.pack(">32s20s32sQQ",
+                             self.MAGIC, my_nodeid, write_enabler,
+                             data_length, extra_lease_offset,
+                             )
+        leases = ("\x00"*self.LEASE_SIZE) * 4
+        f.write(header + leases)
+        # data goes here, empty after creation
+        f.write(struct.pack(">L", num_extra_leases))
+        # extra leases go here, none at creation
+        f.close()
+
+    def get_used_space(self):
+        return get_used_space(self.home)
+
+    def unlink(self):
+        os.unlink(self.home)
+
+    def get_size(self):
+        return os.stat(self.home).st_size
+
+    def _read_data_length(self, f):
+        f.seek(self.DATA_LENGTH_OFFSET)
+        (data_length,) = struct.unpack(">Q", f.read(8))
+        return data_length
+
+    def _read_container_size(self, f):
+        f.seek(self.DATA_LENGTH_OFFSET + 8)
+        (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
+        return extra_lease_offset - self.DATA_OFFSET
+
+    def _write_data_length(self, f, data_length):
+        extra_lease_offset = self.DATA_OFFSET + data_length
+        f.seek(self.DATA_LENGTH_OFFSET)
+        f.write(struct.pack(">QQ", data_length, extra_lease_offset))
+        f.seek(extra_lease_offset)
+        f.write(struct.pack(">L", 0))
+
+    def _read_share_data(self, f, offset, length):
+        precondition(offset >= 0)
+        data_length = self._read_data_length(f)
+        if offset+length > data_length:
+            # reads beyond the end of the data are truncated. Reads that
+            # start beyond the end of the data return an empty string.
+            length = max(0, data_length-offset)
+        if length == 0:
+            return ""
+        precondition(offset+length <= data_length)
+        f.seek(self.DATA_OFFSET+offset)
+        data = f.read(length)
+        return data
+
+    def _write_share_data(self, f, offset, data):
+        length = len(data)
+        precondition(offset >= 0)
+        data_length = self._read_data_length(f)
+
+        if offset+length >= data_length:
+            # They are expanding their data size.
+
+            if offset+length > self.MAX_SIZE:
+                raise DataTooLargeError()
+
+            # Their data now fits in the current container. We must write
+            # their new data and modify the recorded data size.
+
+            # Fill any newly exposed empty space with 0's.
+            if offset > data_length:
+                f.seek(self.DATA_OFFSET+data_length)
+                f.write('\x00'*(offset - data_length))
+                f.flush()
+
+            new_data_length = offset+length
+            self._write_data_length(f, new_data_length)
+            # an interrupt here will result in a corrupted share
+
+        # now all that's left to do is write out their data
+        f.seek(self.DATA_OFFSET+offset)
+        f.write(data)
+        return
+
+    def _read_write_enabler_and_nodeid(self, f):
+        f.seek(0)
+        data = f.read(self.HEADER_SIZE)
+        (magic,
+         write_enabler_nodeid, write_enabler,
+         data_length, extra_least_offset) = \
+         struct.unpack(">32s20s32sQQ", data)
+        assert magic == self.MAGIC
+        return (write_enabler, write_enabler_nodeid)
+
+    def readv(self, readv):
+        datav = []
+        f = open(self.home, 'rb')
+        for (offset, length) in readv:
+            datav.append(self._read_share_data(f, offset, length))
+        f.close()
+        return datav
+
+    def check_write_enabler(self, write_enabler, si_s):
+        f = open(self.home, 'rb+')
+        (real_write_enabler, write_enabler_nodeid) = \
+                             self._read_write_enabler_and_nodeid(f)
+        f.close()
+        # avoid a timing attack
+        #if write_enabler != real_write_enabler:
+        if not timing_safe_compare(write_enabler, real_write_enabler):
+            # accomodate share migration by reporting the nodeid used for the
+            # old write enabler.
+            self.log(format="bad write enabler on SI %(si)s,"
+                     " recorded by nodeid %(nodeid)s",
+                     facility="tahoe.storage",
+                     level=log.WEIRD, umid="cE1eBQ",
+                     si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
+            msg = "The write enabler was recorded by nodeid '%s'." % \
+                  (idlib.nodeid_b2a(write_enabler_nodeid),)
+            raise BadWriteEnablerError(msg)
+
+    def check_testv(self, testv):
+        test_good = True
+        f = open(self.home, 'rb+')
+        for (offset, length, operator, specimen) in testv:
+            data = self._read_share_data(f, offset, length)
+            if not testv_compare(data, operator, specimen):
+                test_good = False
+                break
+        f.close()
+        return test_good
+
+    def writev(self, datav, new_length):
+        f = open(self.home, 'rb+')
+        for (offset, data) in datav:
+            self._write_share_data(f, offset, data)
+        if new_length is not None:
+            cur_length = self._read_data_length(f)
+            if new_length < cur_length:
+                self._write_data_length(f, new_length)
+                # TODO: shrink the share file.
+        f.close()
+
+def testv_compare(a, op, b):
+    assert op in ("lt", "le", "eq", "ne", "ge", "gt")
+    if op == "lt":
+        return a < b
+    if op == "le":
+        return a <= b
+    if op == "eq":
+        return a == b
+    if op == "ne":
+        return a != b
+    if op == "ge":
+        return a >= b
+    if op == "gt":
+        return a > b
+    # never reached
+
+class EmptyShare:
+
+    def check_testv(self, testv):
+        test_good = True
+        for (offset, length, operator, specimen) in testv:
+            data = ""
+            if not testv_compare(data, operator, specimen):
+                test_good = False
+                break
+        return test_good
+
+def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
+    ms = MutableShareFile(filename, parent)
+    ms.create(my_nodeid, write_enabler)
+    del ms
+    return MutableShareFile(filename, parent)
+
diff --git a/src/allmydata/storage/backends/null/__init__.py b/src/allmydata/storage/backends/null/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py
deleted file mode 100644 (file)
index 780b04a..0000000
+++ /dev/null
@@ -1,250 +0,0 @@
-
-import os, struct, time
-
-from foolscap.api import Referenceable
-
-from zope.interface import implements
-from allmydata.interfaces import RIBucketWriter, RIBucketReader
-from allmydata.util import base32, fileutil, log
-from allmydata.util.fileutil import get_used_space
-from allmydata.util.assertutil import precondition
-from allmydata.storage.common import UnknownImmutableContainerVersionError, \
-     DataTooLargeError
-from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE
-
-
-# Each share file (in storage/shares/$SI/$SHNUM) contains share data that
-# can be accessed by RIBucketWriter.write and RIBucketReader.read .
-
-# The share file has the following layout:
-#  0x00: share file version number, four bytes, current version is 1
-#  0x04: share data length, four bytes big-endian     # Footnote 1
-#  0x08: number of leases, four bytes big-endian = N  # Footnote 2
-#  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
-#  filesize - 72*N: leases (ignored). Each lease is 72 bytes.
-
-# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers.
-
-# Footnote 2: as of Tahoe v1.11.0 this field is not used by storage servers.
-# New shares will have a 0 here. Old shares will have whatever value was left
-# over when the server was upgraded. All lease information is now kept in the
-# leasedb.
-
-
-class ShareFile:
-    sharetype = "immutable"
-    LEASE_SIZE = struct.calcsize(">L32s32sL")
-    HEADER = ">LLL"
-    HEADER_SIZE = struct.calcsize(HEADER)
-    DATA_OFFSET = HEADER_SIZE
-
-    def __init__(self, filename, max_size=None, create=False):
-        """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
-        precondition((max_size is not None) or (not create), max_size, create)
-        self.home = filename
-        self._max_size = max_size
-        if create:
-            # touch the file, so later callers will see that we're working on
-            # it. Also construct the metadata.
-            assert not os.path.exists(self.home)
-            fileutil.make_dirs(os.path.dirname(self.home))
-            f = open(self.home, 'wb')
-            # The second field -- the four-byte share data length -- is no
-            # longer used as of Tahoe v1.3.0, but we continue to write it in
-            # there in case someone downgrades a storage server from >=
-            # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
-            # server to another, etc. We do saturation -- a share data length
-            # larger than 2**32-1 (what can fit into the field) is marked as
-            # the largest length that can fit into the field. That way, even
-            # if this does happen, the old < v1.3.0 server will still allow
-            # clients to read the first part of the share.
-            f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
-            f.close()
-            self._data_length = max_size
-        else:
-            f = open(self.home, 'rb')
-            try:
-                (version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE))
-            finally:
-                f.close()
-            if version != 1:
-                msg = "sharefile %s had version %d but we wanted 1" % \
-                      (filename, version)
-                raise UnknownImmutableContainerVersionError(msg)
-
-            filesize = os.stat(self.home).st_size
-            self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE)
-
-        # TODO: raise a better exception.
-        assert self._data_length >= 0, self._data_length
-
-    def get_used_space(self):
-        return get_used_space(self.home)
-
-    def unlink(self):
-        os.unlink(self.home)
-
-    def get_size(self):
-        return os.stat(self.home).st_size
-
-    def read_share_data(self, offset, length):
-        precondition(offset >= 0)
-
-        # Reads beyond the end of the data are truncated. Reads that start
-        # beyond the end of the data return an empty string.
-        seekpos = self.DATA_OFFSET + offset
-        actuallength = max(0, min(length, self._data_length - offset))
-        if actuallength == 0:
-            return ""
-        f = open(self.home, 'rb')
-        try:
-            f.seek(seekpos)
-            return f.read(actuallength)
-        finally:
-            f.close()
-
-    def write_share_data(self, offset, data):
-        length = len(data)
-        precondition(offset >= 0, offset)
-        if self._max_size is not None and offset+length > self._max_size:
-            raise DataTooLargeError(self._max_size, offset, length)
-        f = open(self.home, 'rb+')
-        try:
-            real_offset = self.DATA_OFFSET + offset
-            f.seek(real_offset)
-            assert f.tell() == real_offset
-            f.write(data)
-        finally:
-            f.close()
-
-
-class BucketWriter(Referenceable):
-    implements(RIBucketWriter)
-
-    def __init__(self, ss, account, storage_index, shnum,
-                 incominghome, finalhome, max_size, canary):
-        self.ss = ss
-        self.incominghome = incominghome
-        self.finalhome = finalhome
-        self._max_size = max_size # don't allow the client to write more than this
-        self._account = account
-        self._storage_index = storage_index
-        self._shnum = shnum
-        self._canary = canary
-        self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
-        self.closed = False
-        self.throw_out_all_data = False
-        self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
-        self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE)
-
-    def allocated_size(self):
-        return self._max_size
-
-    def remote_write(self, offset, data):
-        start = time.time()
-        precondition(not self.closed)
-        if self.throw_out_all_data:
-            return
-        self._sharefile.write_share_data(offset, data)
-        self.ss.add_latency("write", time.time() - start)
-        self.ss.count("write")
-
-    def remote_close(self):
-        precondition(not self.closed)
-        start = time.time()
-
-        fileutil.make_dirs(os.path.dirname(self.finalhome))
-        fileutil.rename(self.incominghome, self.finalhome)
-        try:
-            # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
-            # We try to delete the parent (.../ab/abcde) to avoid leaving
-            # these directories lying around forever, but the delete might
-            # fail if we're working on another share for the same storage
-            # index (like ab/abcde/5). The alternative approach would be to
-            # use a hierarchy of objects (PrefixHolder, BucketHolder,
-            # ShareWriter), each of which is responsible for a single
-            # directory on disk, and have them use reference counting of
-            # their children to know when they should do the rmdir. This
-            # approach is simpler, but relies on os.rmdir refusing to delete
-            # a non-empty directory. Do *not* use fileutil.rm_dir() here!
-            os.rmdir(os.path.dirname(self.incominghome))
-            # we also delete the grandparent (prefix) directory, .../ab ,
-            # again to avoid leaving directories lying around. This might
-            # fail if there is another bucket open that shares a prefix (like
-            # ab/abfff).
-            os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
-            # we leave the great-grandparent (incoming/) directory in place.
-        except EnvironmentError:
-            # ignore the "can't rmdir because the directory is not empty"
-            # exceptions, those are normal consequences of the
-            # above-mentioned conditions.
-            pass
-        self._sharefile = None
-        self.closed = True
-        self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-
-        filelen = get_used_space(self.finalhome)
-        self.ss.bucket_writer_closed(self, filelen)
-        self._account.add_or_renew_default_lease(self._storage_index, self._shnum)
-        self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen)
-        self.ss.add_latency("close", time.time() - start)
-        self.ss.count("close")
-
-    def _disconnected(self):
-        if not self.closed:
-            self._abort()
-
-    def remote_abort(self):
-        log.msg("storage: aborting sharefile %s" % self.incominghome,
-                facility="tahoe.storage", level=log.UNUSUAL)
-        if not self.closed:
-            self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-        self._abort()
-        self.ss.count("abort")
-
-    def _abort(self):
-        if self.closed:
-            return
-
-        os.remove(self.incominghome)
-        # if we were the last share to be moved, remove the incoming/
-        # directory that was our parent
-        parentdir = os.path.split(self.incominghome)[0]
-        if not os.listdir(parentdir):
-            os.rmdir(parentdir)
-        self._sharefile = None
-        self._account.remove_share_and_leases(self._storage_index, self._shnum)
-
-        # We are now considered closed for further writing. We must tell
-        # the storage server about this so that it stops expecting us to
-        # use the space it allocated for us earlier.
-        self.closed = True
-        self.ss.bucket_writer_closed(self, 0)
-
-
-class BucketReader(Referenceable):
-    implements(RIBucketReader)
-
-    def __init__(self, ss, sharefname, storage_index=None, shnum=None):
-        self.ss = ss
-        self._share_file = ShareFile(sharefname)
-        self.storage_index = storage_index
-        self.shnum = shnum
-
-    def __repr__(self):
-        return "<%s %s %s>" % (self.__class__.__name__,
-                               base32.b2a_l(self.storage_index[:8], 60),
-                               self.shnum)
-
-    def remote_read(self, offset, length):
-        start = time.time()
-        data = self._share_file.read_share_data(offset, length)
-        self.ss.add_latency("read", time.time() - start)
-        self.ss.count("read")
-        return data
-
-    def remote_advise_corrupt_share(self, reason):
-        return self.ss.client_advise_corrupt_share("immutable",
-                                                   self.storage_index,
-                                                   self.shnum,
-                                                   reason)
diff --git a/src/allmydata/storage/mutable.py b/src/allmydata/storage/mutable.py
deleted file mode 100644 (file)
index 4d83495..0000000
+++ /dev/null
@@ -1,255 +0,0 @@
-
-import os, struct
-
-from allmydata.interfaces import BadWriteEnablerError
-from allmydata.util import idlib, log
-from allmydata.util.assertutil import precondition
-from allmydata.util.hashutil import timing_safe_compare
-from allmydata.util.fileutil import get_used_space
-from allmydata.storage.common import UnknownMutableContainerVersionError, \
-     DataTooLargeError
-from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
-
-
-# the MutableShareFile is like the ShareFile, but used for mutable data. It
-# has a different layout. See docs/mutable.txt for more details.
-
-# #   offset    size    name
-# 1   0         32      magic verstr "tahoe mutable container v1" plus binary
-# 2   32        20      write enabler's nodeid
-# 3   52        32      write enabler
-# 4   84        8       data size (actual share data present) (a)
-# 5   92        8       offset of (8) count of extra leases (after data)
-# 6   100       368     four leases, 92 bytes each
-#                        0    4   ownerid (0 means "no lease here")
-#                        4    4   expiration timestamp
-#                        8   32   renewal token
-#                        40  32   cancel token
-#                        72  20   nodeid which accepted the tokens
-# 7   468       (a)     data
-# 8   ??        4       count of extra leases
-# 9   ??        n*92    extra leases
-
-
-# The struct module doc says that L's are 4 bytes in size., and that Q's are
-# 8 bytes in size. Since compatibility depends upon this, double-check it.
-assert struct.calcsize(">L") == 4, struct.calcsize(">L")
-assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
-
-class MutableShareFile:
-
-    sharetype = "mutable"
-    DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
-    HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
-    LEASE_SIZE = struct.calcsize(">LL32s32s20s")
-    assert LEASE_SIZE == 92
-    DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
-    assert DATA_OFFSET == 468, DATA_OFFSET
-    # our sharefiles share with a recognizable string, plus some random
-    # binary data to reduce the chance that a regular text file will look
-    # like a sharefile.
-    MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
-    assert len(MAGIC) == 32
-    MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
-    # TODO: decide upon a policy for max share size
-
-    def __init__(self, filename, parent=None):
-        self.home = filename
-        if os.path.exists(self.home):
-            # we don't cache anything, just check the magic
-            f = open(self.home, 'rb')
-            data = f.read(self.HEADER_SIZE)
-            (magic,
-             write_enabler_nodeid, write_enabler,
-             data_length, extra_lease_offset) = \
-             struct.unpack(">32s20s32sQQ", data)
-            if magic != self.MAGIC:
-                msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
-                      (filename, magic, self.MAGIC)
-                raise UnknownMutableContainerVersionError(msg)
-        self.parent = parent # for logging
-
-    def log(self, *args, **kwargs):
-        return self.parent.log(*args, **kwargs)
-
-    def create(self, my_nodeid, write_enabler):
-        assert not os.path.exists(self.home)
-        data_length = 0
-        extra_lease_offset = (self.HEADER_SIZE
-                              + 4 * self.LEASE_SIZE
-                              + data_length)
-        assert extra_lease_offset == self.DATA_OFFSET # true at creation
-        num_extra_leases = 0
-        f = open(self.home, 'wb')
-        header = struct.pack(">32s20s32sQQ",
-                             self.MAGIC, my_nodeid, write_enabler,
-                             data_length, extra_lease_offset,
-                             )
-        leases = ("\x00"*self.LEASE_SIZE) * 4
-        f.write(header + leases)
-        # data goes here, empty after creation
-        f.write(struct.pack(">L", num_extra_leases))
-        # extra leases go here, none at creation
-        f.close()
-
-    def get_used_space(self):
-        return get_used_space(self.home)
-
-    def unlink(self):
-        os.unlink(self.home)
-
-    def get_size(self):
-        return os.stat(self.home).st_size
-
-    def _read_data_length(self, f):
-        f.seek(self.DATA_LENGTH_OFFSET)
-        (data_length,) = struct.unpack(">Q", f.read(8))
-        return data_length
-
-    def _read_container_size(self, f):
-        f.seek(self.DATA_LENGTH_OFFSET + 8)
-        (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
-        return extra_lease_offset - self.DATA_OFFSET
-
-    def _write_data_length(self, f, data_length):
-        extra_lease_offset = self.DATA_OFFSET + data_length
-        f.seek(self.DATA_LENGTH_OFFSET)
-        f.write(struct.pack(">QQ", data_length, extra_lease_offset))
-        f.seek(extra_lease_offset)
-        f.write(struct.pack(">L", 0))
-
-    def _read_share_data(self, f, offset, length):
-        precondition(offset >= 0)
-        data_length = self._read_data_length(f)
-        if offset+length > data_length:
-            # reads beyond the end of the data are truncated. Reads that
-            # start beyond the end of the data return an empty string.
-            length = max(0, data_length-offset)
-        if length == 0:
-            return ""
-        precondition(offset+length <= data_length)
-        f.seek(self.DATA_OFFSET+offset)
-        data = f.read(length)
-        return data
-
-    def _write_share_data(self, f, offset, data):
-        length = len(data)
-        precondition(offset >= 0)
-        data_length = self._read_data_length(f)
-
-        if offset+length >= data_length:
-            # They are expanding their data size.
-
-            if offset+length > self.MAX_SIZE:
-                raise DataTooLargeError()
-
-            # Their data now fits in the current container. We must write
-            # their new data and modify the recorded data size.
-
-            # Fill any newly exposed empty space with 0's.
-            if offset > data_length:
-                f.seek(self.DATA_OFFSET+data_length)
-                f.write('\x00'*(offset - data_length))
-                f.flush()
-
-            new_data_length = offset+length
-            self._write_data_length(f, new_data_length)
-            # an interrupt here will result in a corrupted share
-
-        # now all that's left to do is write out their data
-        f.seek(self.DATA_OFFSET+offset)
-        f.write(data)
-        return
-
-    def _read_write_enabler_and_nodeid(self, f):
-        f.seek(0)
-        data = f.read(self.HEADER_SIZE)
-        (magic,
-         write_enabler_nodeid, write_enabler,
-         data_length, extra_least_offset) = \
-         struct.unpack(">32s20s32sQQ", data)
-        assert magic == self.MAGIC
-        return (write_enabler, write_enabler_nodeid)
-
-    def readv(self, readv):
-        datav = []
-        f = open(self.home, 'rb')
-        for (offset, length) in readv:
-            datav.append(self._read_share_data(f, offset, length))
-        f.close()
-        return datav
-
-    def check_write_enabler(self, write_enabler, si_s):
-        f = open(self.home, 'rb+')
-        (real_write_enabler, write_enabler_nodeid) = \
-                             self._read_write_enabler_and_nodeid(f)
-        f.close()
-        # avoid a timing attack
-        #if write_enabler != real_write_enabler:
-        if not timing_safe_compare(write_enabler, real_write_enabler):
-            # accomodate share migration by reporting the nodeid used for the
-            # old write enabler.
-            self.log(format="bad write enabler on SI %(si)s,"
-                     " recorded by nodeid %(nodeid)s",
-                     facility="tahoe.storage",
-                     level=log.WEIRD, umid="cE1eBQ",
-                     si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
-            msg = "The write enabler was recorded by nodeid '%s'." % \
-                  (idlib.nodeid_b2a(write_enabler_nodeid),)
-            raise BadWriteEnablerError(msg)
-
-    def check_testv(self, testv):
-        test_good = True
-        f = open(self.home, 'rb+')
-        for (offset, length, operator, specimen) in testv:
-            data = self._read_share_data(f, offset, length)
-            if not testv_compare(data, operator, specimen):
-                test_good = False
-                break
-        f.close()
-        return test_good
-
-    def writev(self, datav, new_length):
-        f = open(self.home, 'rb+')
-        for (offset, data) in datav:
-            self._write_share_data(f, offset, data)
-        if new_length is not None:
-            cur_length = self._read_data_length(f)
-            if new_length < cur_length:
-                self._write_data_length(f, new_length)
-                # TODO: shrink the share file.
-        f.close()
-
-def testv_compare(a, op, b):
-    assert op in ("lt", "le", "eq", "ne", "ge", "gt")
-    if op == "lt":
-        return a < b
-    if op == "le":
-        return a <= b
-    if op == "eq":
-        return a == b
-    if op == "ne":
-        return a != b
-    if op == "ge":
-        return a >= b
-    if op == "gt":
-        return a > b
-    # never reached
-
-class EmptyShare:
-
-    def check_testv(self, testv):
-        test_good = True
-        for (offset, length, operator, specimen) in testv:
-            data = ""
-            if not testv_compare(data, operator, specimen):
-                test_good = False
-                break
-        return test_good
-
-def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
-    ms = MutableShareFile(filename, parent)
-    ms.create(my_nodeid, write_enabler)
-    del ms
-    return MutableShareFile(filename, parent)
-
index 857ea21a30464374d1c16f02dfae34b8349822e0..2403e11bdb2a73edb4cba90401537684b665e869 100644 (file)
@@ -10,10 +10,10 @@ import allmydata # for __full_version__
 
 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
-from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
+from allmydata.storage.backends.disk.mutable import MutableShareFile, EmptyShare, \
      create_mutable_sharefile
 from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
-from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
+from allmydata.storage.backends.disk.immutable import ShareFile, BucketWriter, BucketReader
 from allmydata.storage.crawler import BucketCountingCrawler
 from allmydata.storage.accountant import Accountant
 from allmydata.storage.expiration import ExpirationPolicy
index 558bddc197714d8fc53a30a00a86e80bae04375c..bcdf98c40b166c111ee1e1883b7326da5cd0d7be 100644 (file)
@@ -1,7 +1,7 @@
 #! /usr/bin/python
 
-from allmydata.storage.mutable import MutableShareFile
-from allmydata.storage.immutable import ShareFile
+from allmydata.storage.backends.disk.mutable import MutableShareFile
+from allmydata.storage.backends.disk.immutable import ShareFile
 
 def get_share_file(filename):
     f = open(filename, "rb")
index 311a82c6ff400536ca23107edcf6ecbc6ce659a2..fe4d2aa0b58f64bc3b7c8a1df0f7728ea33cdc8c 100644 (file)
@@ -17,7 +17,7 @@ from allmydata.check_results import CheckResults, CheckAndRepairResults, \
 from allmydata.storage_client import StubServer
 from allmydata.mutable.layout import unpack_header
 from allmydata.mutable.publish import MutableData
-from allmydata.storage.mutable import MutableShareFile
+from allmydata.storage.backends.disk.mutable import MutableShareFile
 from allmydata.util import hashutil, log, fileutil, pollmixin
 from allmydata.util.assertutil import precondition
 from allmydata.util.consumer import download_to_data
index 150ba9bce16c7fb873ed585505dc414814291214..ab515fc2bbbb1a6f4f062d28174e862aa9f1b9b1 100644 (file)
@@ -12,8 +12,8 @@ import itertools
 from allmydata import interfaces
 from allmydata.util import fileutil, hashutil, base32, time_format
 from allmydata.storage.server import StorageServer
-from allmydata.storage.mutable import MutableShareFile
-from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
+from allmydata.storage.backends.disk.mutable import MutableShareFile
+from allmydata.storage.backends.disk.immutable import BucketWriter, BucketReader, ShareFile
 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
 from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE, SHARETYPE_MUTABLE
index 0f35f4ea753795e77bcc9fdec31837308e81c0eb..fe00fea335d140420c1a55fbbe2e8f2c10d2890d 100644 (file)
@@ -8,7 +8,7 @@ from twisted.internet import threads # CLI tests use deferToThread
 
 import allmydata
 from allmydata import uri
-from allmydata.storage.mutable import MutableShareFile
+from allmydata.storage.backends.disk.mutable import MutableShareFile
 from allmydata.storage.server import si_a2b
 from allmydata.immutable import offloaded, upload
 from allmydata.immutable.literal import LiteralFileNode