]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Add new files for cloud merge (rebased).
authorDaira Hopwood <daira@jacaranda.org>
Wed, 9 Apr 2014 00:33:33 +0000 (01:33 +0100)
committerDaira Hopwood <daira@jacaranda.org>
Fri, 16 Oct 2015 16:21:01 +0000 (17:21 +0100)
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
src/allmydata/storage/backends/base.py [new file with mode: 0644]
src/allmydata/storage/backends/cloud/cloud_backend.py [new file with mode: 0644]
src/allmydata/storage/backends/cloud/cloud_common.py [new file with mode: 0644]
src/allmydata/storage/backends/cloud/immutable.py [new file with mode: 0644]
src/allmydata/storage/backends/cloud/mock_cloud.py [new file with mode: 0644]
src/allmydata/storage/backends/cloud/mutable.py [new file with mode: 0644]
src/allmydata/storage/backends/cloud/s3/s3_container.py [new file with mode: 0644]
src/allmydata/storage/backends/disk/disk_backend.py [new file with mode: 0644]
src/allmydata/storage/backends/null/null_backend.py [new file with mode: 0644]

diff --git a/src/allmydata/storage/backends/base.py b/src/allmydata/storage/backends/base.py
new file mode 100644 (file)
index 0000000..b56266c
--- /dev/null
@@ -0,0 +1,208 @@
+
+from twisted.application import service
+from twisted.internet import defer
+
+from allmydata.util.deferredutil import async_iterate, gatherResults
+from allmydata.storage.common import si_b2a
+from allmydata.storage.bucket import BucketReader
+from allmydata.storage.leasedb import SHARETYPE_MUTABLE
+
+
+class Backend(service.MultiService):
+    def __init__(self):
+        service.MultiService.__init__(self)
+
+    def must_use_tubid_as_permutation_seed(self):
+        # New backends cannot have been around before #466, and so have no backward
+        # compatibility requirements for permutation seeds. The disk backend overrides this.
+        return False
+
+
+class ShareSet(object):
+    """
+    This class implements shareset logic that could work for all backends, but
+    might be useful to override for efficiency.
+    """
+    # TODO: queue operations on a shareset to ensure atomicity for each fully
+    # successful operation (#1869).
+
+    def __init__(self, storage_index):
+        self.storage_index = storage_index
+
+    def get_storage_index(self):
+        return self.storage_index
+
+    def get_storage_index_string(self):
+        return si_b2a(self.storage_index)
+
+    def make_bucket_reader(self, account, share):
+        return BucketReader(account, share)
+
+    def testv_and_readv_and_writev(self, write_enabler,
+                                   test_and_write_vectors, read_vector,
+                                   expiration_time, account):
+        # The implementation here depends on the following helper methods,
+        # which must be provided by subclasses:
+        #
+        # def _clean_up_after_unlink(self):
+        #     """clean up resources associated with the shareset after some
+        #     shares might have been deleted"""
+        #
+        # def _create_mutable_share(self, account, shnum, write_enabler):
+        #     """create a mutable share with the given shnum and write_enabler"""
+
+        sharemap = {}
+        d = self.get_shares()
+        def _got_shares( (shares, corrupted) ):
+            d2 = defer.succeed(None)
+            for share in shares:
+                assert not isinstance(share, defer.Deferred), share
+                # XXX is it correct to ignore immutable shares? Maybe get_shares should
+                # have a parameter saying what type it's expecting.
+                if share.sharetype == "mutable":
+                    d2.addCallback(lambda ign, share=share: share.check_write_enabler(write_enabler))
+                    sharemap[share.get_shnum()] = share
+
+            shnums = sorted(sharemap.keys())
+
+            # if d2 does not fail, write_enabler is good for all existing shares
+
+            # now evaluate test vectors
+            def _check_testv(shnum):
+                (testv, datav, new_length) = test_and_write_vectors[shnum]
+                if shnum in sharemap:
+                    d3 = sharemap[shnum].check_testv(testv)
+                elif shnum in corrupted:
+                    # a corrupted share does not match any test vector
+                    d3 = defer.succeed(False)
+                else:
+                    # compare the vectors against an empty share, in which all
+                    # reads return empty strings
+                    d3 = defer.succeed(empty_check_testv(testv))
+
+                def _check_result(res):
+                    if not res:
+                        account.server.log("testv failed: [%d] %r" % (shnum, testv))
+                    return res
+                d3.addCallback(_check_result)
+                return d3
+
+            d2.addCallback(lambda ign: async_iterate(_check_testv, test_and_write_vectors))
+
+            def _gather(testv_is_good):
+                # Gather the read vectors, before we do any writes. This ignores any
+                # corrupted shares.
+                d3 = gatherResults([sharemap[shnum].readv(read_vector) for shnum in shnums])
+
+                def _do_writes(reads):
+                    read_data = {}
+                    for i in range(len(shnums)):
+                        read_data[shnums[i]] = reads[i]
+
+                    d4 = defer.succeed(None)
+                    if testv_is_good:
+                        if len(set(test_and_write_vectors.keys()) & corrupted) > 0:
+                            # XXX think of a better exception to raise
+                            raise AssertionError("You asked to write share numbers %r of storage index %r, "
+                                                 "but one or more of those is corrupt (numbers %r)"
+                                                 % (list(sorted(test_and_write_vectors.keys())),
+                                                    self.get_storage_index_string(),
+                                                    list(sorted(corrupted))) )
+
+                        # now apply the write vectors
+                        for shnum in test_and_write_vectors:
+                            (testv, datav, new_length) = test_and_write_vectors[shnum]
+                            if new_length == 0:
+                                if shnum in sharemap:
+                                    d4.addCallback(lambda ign, shnum=shnum:
+                                                   sharemap[shnum].unlink())
+                                    d4.addCallback(lambda ign, shnum=shnum:
+                                                   account.remove_share_and_leases(self.storage_index, shnum))
+                            else:
+                                if shnum not in sharemap:
+                                    # allocate a new share
+                                    d4.addCallback(lambda ign, shnum=shnum:
+                                                   self._create_mutable_share(account, shnum,
+                                                                              write_enabler))
+                                    def _record_share(share, shnum=shnum):
+                                        sharemap[shnum] = share
+                                        account.add_share(self.storage_index, shnum, share.get_used_space(),
+                                                          SHARETYPE_MUTABLE)
+                                    d4.addCallback(_record_share)
+                                d4.addCallback(lambda ign, shnum=shnum, datav=datav, new_length=new_length:
+                                               sharemap[shnum].writev(datav, new_length))
+                                def _update_lease(ign, shnum=shnum):
+                                    account.add_or_renew_default_lease(self.storage_index, shnum)
+                                    account.mark_share_as_stable(self.storage_index, shnum,
+                                                                 sharemap[shnum].get_used_space())
+                                d4.addCallback(_update_lease)
+
+                        if new_length == 0:
+                            d4.addCallback(lambda ign: self._clean_up_after_unlink())
+
+                    d4.addCallback(lambda ign: (testv_is_good, read_data))
+                    return d4
+                d3.addCallback(_do_writes)
+                return d3
+            d2.addCallback(_gather)
+            return d2
+        d.addCallback(_got_shares)
+        return d
+
+    def readv(self, wanted_shnums, read_vector):
+        """
+        Read a vector from the numbered shares in this shareset. An empty
+        shares list means to return data from all known shares.
+
+        @param wanted_shnums=ListOf(int)
+        @param read_vector=ReadVector
+        @return DictOf(int, ReadData): shnum -> results, with one key per share
+        """
+        shnums = []
+        dreads = []
+        d = self.get_shares()
+        def _got_shares( (shares, corrupted) ):
+            # We ignore corrupted shares.
+            for share in shares:
+                assert not isinstance(share, defer.Deferred), share
+                shnum = share.get_shnum()
+                if not wanted_shnums or shnum in wanted_shnums:
+                    shnums.append(share.get_shnum())
+                    dreads.append(share.readv(read_vector))
+            return gatherResults(dreads)
+        d.addCallback(_got_shares)
+
+        def _got_reads(reads):
+            datavs = {}
+            for i in range(len(shnums)):
+                datavs[shnums[i]] = reads[i]
+            return datavs
+        d.addCallback(_got_reads)
+        return d
+
+
+def testv_compare(a, op, b):
+    assert op in ("lt", "le", "eq", "ne", "ge", "gt")
+    if op == "lt":
+        return a < b
+    if op == "le":
+        return a <= b
+    if op == "eq":
+        return a == b
+    if op == "ne":
+        return a != b
+    if op == "ge":
+        return a >= b
+    if op == "gt":
+        return a > b
+    # never reached
+
+
+def empty_check_testv(testv):
+    test_good = True
+    for (offset, length, operator, specimen) in testv:
+        data = ""
+        if not testv_compare(data, operator, specimen):
+            test_good = False
+            break
+    return test_good
diff --git a/src/allmydata/storage/backends/cloud/cloud_backend.py b/src/allmydata/storage/backends/cloud/cloud_backend.py
new file mode 100644 (file)
index 0000000..4fcb0dc
--- /dev/null
@@ -0,0 +1,174 @@
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IStorageBackend, IShareSet
+
+from allmydata.node import InvalidValueError
+from allmydata.util.deferredutil import gatherResults
+from allmydata.util.assertutil import _assert
+from allmydata.util.dictutil import NumDict
+from allmydata.util.encodingutil import quote_output
+from allmydata.storage.common import si_a2b, NUM_RE
+from allmydata.storage.bucket import BucketWriter
+from allmydata.storage.backends.base import Backend, ShareSet
+from allmydata.storage.backends.cloud.immutable import ImmutableCloudShareForReading, ImmutableCloudShareForWriting
+from allmydata.storage.backends.cloud.mutable import MutableCloudShare
+from allmydata.storage.backends.cloud.cloud_common import get_share_key, delete_chunks
+from allmydata.mutable.layout import MUTABLE_MAGIC
+
+
+def get_cloud_share(container, storage_index, shnum, total_size):
+    key = get_share_key(storage_index, shnum)
+    d = container.get_object(key)
+    def _make_share(first_chunkdata):
+        if first_chunkdata.startswith(MUTABLE_MAGIC):
+            return MutableCloudShare(container, storage_index, shnum, total_size, first_chunkdata)
+        else:
+            # assume it's immutable
+            return ImmutableCloudShareForReading(container, storage_index, shnum, total_size, first_chunkdata)
+    d.addCallback(_make_share)
+    return d
+
+
+def configure_cloud_backend(storedir, config):
+    # REMIND: when multiple container implementations are supported, only import the container we're going to use.
+    from allmydata.storage.backends.cloud.s3.s3_container import configure_s3_container
+
+    if config.get_config("storage", "readonly", False, boolean=True):
+        raise InvalidValueError("[storage]readonly is not supported by the cloud backend; "
+                                "make the container read-only instead.")
+
+    backendtype = config.get_config("storage", "backend", "disk")
+    if backendtype == "s3":
+        backendtype = "cloud.s3"
+
+    container_configurators = {
+        'cloud.s3': configure_s3_container,
+    }
+
+    if backendtype not in container_configurators:
+        raise InvalidValueError("%s is not supported by the cloud backend; it must be one of %s"
+                                % (quote_output("[storage]backend = " + backendtype), container_configurators.keys()) )
+
+    container = container_configurators[backendtype](storedir, config)
+    return CloudBackend(container)
+
+
+class CloudBackend(Backend):
+    implements(IStorageBackend)
+
+    def __init__(self, container):
+        Backend.__init__(self)
+        self._container = container
+
+        # set of (storage_index, shnum) of incoming shares
+        self._incomingset = set()
+
+    def get_sharesets_for_prefix(self, prefix):
+        d = self._container.list_objects(prefix='shares/%s/' % (prefix,))
+        def _get_sharesets(res):
+            # XXX this enumerates all shares to get the set of SIs.
+            # Is there a way to enumerate SIs more efficiently?
+            si_strings = set()
+            for item in res.contents:
+                # XXX better error handling
+                path = item.key.split('/')
+                _assert(path[0:2] == ["shares", prefix], path=path, prefix=prefix)
+                si_strings.add(path[2])
+
+            # XXX we want this to be deterministic, so we return the sharesets sorted
+            # by their si_strings, but we shouldn't need to explicitly re-sort them
+            # because list_objects returns a sorted list.
+            return [CloudShareSet(si_a2b(s), self._container, self._incomingset) for s in sorted(si_strings)]
+        d.addCallback(_get_sharesets)
+        return d
+
+    def get_shareset(self, storage_index):
+        return CloudShareSet(storage_index, self._container, self._incomingset)
+
+    def fill_in_space_stats(self, stats):
+        # TODO: query space usage of container if supported.
+        # TODO: query whether the container is read-only and set
+        # accepting_immutable_shares accordingly.
+        stats['storage_server.accepting_immutable_shares'] = 1
+
+    def get_available_space(self):
+        # TODO: query space usage of container if supported.
+        return 2**64
+
+
+class CloudShareSet(ShareSet):
+    implements(IShareSet)
+
+    def __init__(self, storage_index, container, incomingset):
+        ShareSet.__init__(self, storage_index)
+        self._container = container
+        self._incomingset = incomingset
+        self._key = get_share_key(storage_index)
+
+    def get_overhead(self):
+        return 0
+
+    def get_shares(self):
+        d = self._container.list_objects(prefix=self._key)
+        def _get_shares(res):
+            si = self.get_storage_index()
+            shnum_to_total_size = NumDict()
+            for item in res.contents:
+                key = item.key
+                _assert(key.startswith(self._key), key=key, self_key=self._key)
+                path = key.split('/')
+                if len(path) == 4:
+                    (shnumstr, _, chunknumstr) = path[3].partition('.')
+                    chunknumstr = chunknumstr or '0'
+                    if NUM_RE.match(shnumstr) and NUM_RE.match(chunknumstr):
+                        # The size is taken as the sum of sizes for all chunks, but for simplicity
+                        # we don't check here that the individual chunk sizes match expectations.
+                        # If they don't, that will cause an error on reading.
+                        shnum_to_total_size.add_num(int(shnumstr), int(item.size))
+
+            return gatherResults([get_cloud_share(self._container, si, shnum, total_size)
+                                  for (shnum, total_size) in shnum_to_total_size.items_sorted_by_key()])
+        d.addCallback(_get_shares)
+        # TODO: return information about corrupt shares.
+        d.addCallback(lambda shares: (shares, set()) )
+        return d
+
+    def get_share(self, shnum):
+        key = "%s%d" % (self._key, shnum)
+        d = self._container.list_objects(prefix=key)
+        def _get_share(res):
+            total_size = 0
+            for item in res.contents:
+                total_size += item.size
+            return get_cloud_share(self._container, self.get_storage_index(), shnum, total_size)
+        d.addCallback(_get_share)
+        return d
+
+    def delete_share(self, shnum):
+        key = "%s%d" % (self._key, shnum)
+        return delete_chunks(self._container, key)
+
+    def has_incoming(self, shnum):
+        return (self.get_storage_index(), shnum) in self._incomingset
+
+    def make_bucket_writer(self, account, shnum, allocated_data_length, canary):
+        immsh = ImmutableCloudShareForWriting(self._container, self.get_storage_index(), shnum,
+                                              allocated_data_length, self._incomingset)
+        d = defer.succeed(None)
+        d.addCallback(lambda ign: BucketWriter(account, immsh, canary))
+        return d
+
+    def _create_mutable_share(self, account, shnum, write_enabler):
+        serverid = account.server.get_serverid()
+        return MutableCloudShare.create_empty_share(self._container, serverid, write_enabler,
+                                                    self.get_storage_index(), shnum, parent=account.server)
+
+    def _clean_up_after_unlink(self):
+        pass
+
+    def _get_sharedir(self):
+        # For use by tests, only with the mock cloud backend.
+        # It is OK that _get_path doesn't exist on real container objects.
+        return self._container._get_path(self._key)
diff --git a/src/allmydata/storage/backends/cloud/cloud_common.py b/src/allmydata/storage/backends/cloud/cloud_common.py
new file mode 100644 (file)
index 0000000..d1d5b18
--- /dev/null
@@ -0,0 +1,507 @@
+
+from collections import deque
+
+from twisted.internet import defer, reactor, task
+from twisted.python.failure import Failure
+
+from zope.interface import Interface, implements
+from allmydata.interfaces import IShareBase
+
+from allmydata.util import log
+from allmydata.util.assertutil import precondition, _assert
+from allmydata.util.deferredutil import eventually_callback, eventually_errback, eventual_chain, gatherResults
+from allmydata.storage.common import si_b2a, NUM_RE
+
+
+# The container has keys of the form shares/$PREFIX/$STORAGEINDEX/$SHNUM.$CHUNK
+
+def get_share_key(si, shnum=None):
+    sistr = si_b2a(si)
+    if shnum is None:
+        return "shares/%s/%s/" % (sistr[:2], sistr)
+    else:
+        return "shares/%s/%s/%d" % (sistr[:2], sistr, shnum)
+
+def get_chunk_key(share_key, chunknum):
+    precondition(chunknum >= 0, chunknum=chunknum)
+    if chunknum == 0:
+        return share_key
+    else:
+        return "%s.%d" % (share_key, chunknum)
+
+
+PREFERRED_CHUNK_SIZE = 512*1024
+PIPELINE_DEPTH = 4
+
+ZERO_CHUNKDATA = "\x00"*PREFERRED_CHUNK_SIZE
+
+def get_zero_chunkdata(size):
+    if size <= PREFERRED_CHUNK_SIZE:
+        return ZERO_CHUNKDATA[: size]
+    else:
+        return "\x00"*size
+
+
+class IContainer(Interface):
+    """
+    I represent a cloud container.
+    """
+    def create():
+        """
+        Create this container.
+        """
+
+    def delete():
+        """
+        Delete this container.
+        The cloud service may require the container to be empty before it can be deleted.
+        """
+
+    def list_objects(prefix=''):
+        """
+        Get a ContainerListing that lists objects in this container.
+
+        prefix: (str) limit the returned keys to those starting with prefix.
+        """
+
+    def put_object(object_name, data, content_type=None, metadata={}):
+        """
+        Put an object in this bucket.
+        Any existing object of the same name will be replaced.
+        """
+
+    def get_object(object_name):
+        """
+        Get an object from this container.
+        """
+
+    def head_object(object_name):
+        """
+        Retrieve object metadata only.
+        """
+
+    def delete_object(object_name):
+        """
+        Delete an object from this container.
+        Once deleted, there is no method to restore or undelete an object.
+        """
+
+
+def delete_chunks(container, share_key, from_chunknum=0):
+    d = container.list_objects(prefix=share_key)
+    def _delete(res):
+        def _suppress_404(f):
+            e = f.trap(container.ServiceError)
+            if e.get_error_code() != 404:
+                return f
+
+        d2 = defer.succeed(None)
+        for item in res.contents:
+            key = item.key
+            _assert(key.startswith(share_key), key=key, share_key=share_key)
+            path = key.split('/')
+            if len(path) == 4:
+                (_, _, chunknumstr) = path[3].partition('.')
+                chunknumstr = chunknumstr or "0"
+                if NUM_RE.match(chunknumstr) and int(chunknumstr) >= from_chunknum:
+                    d2.addCallback(lambda ign, key=key: container.delete_object(key))
+                    d2.addErrback(_suppress_404)
+        return d2
+    d.addCallback(_delete)
+    return d
+
+
+class CloudShareBase(object):
+    implements(IShareBase)
+    """
+    Attributes:
+      _container:     (IContainer) the cloud container that stores this share
+      _storage_index: (str) binary storage index
+      _shnum:         (integer) share number
+      _key:           (str) the key prefix under which this share will be stored (no .chunknum suffix)
+      _data_length:   (integer) length of data excluding headers and leases
+      _total_size:    (integer) total size of the sharefile
+
+    Methods:
+      _discard(self): object will no longer be used; discard references to potentially large data
+    """
+    def __init__(self, container, storage_index, shnum):
+        precondition(IContainer.providedBy(container), container=container)
+        precondition(isinstance(storage_index, str), storage_index=storage_index)
+        precondition(isinstance(shnum, int), shnum=shnum)
+
+        # These are always known immediately.
+        self._container = container
+        self._storage_index = storage_index
+        self._shnum = shnum
+        self._key = get_share_key(storage_index, shnum)
+
+        # Subclasses must set _data_length and _total_size.
+
+    def __repr__(self):
+        return ("<%s at %r key %r>" % (self.__class__.__name__, self._container, self._key,))
+
+    def get_storage_index(self):
+        return self._storage_index
+
+    def get_storage_index_string(self):
+        return si_b2a(self._storage_index)
+
+    def get_shnum(self):
+        return self._shnum
+
+    def get_data_length(self):
+        return self._data_length
+
+    def get_size(self):
+        return self._total_size
+
+    def get_used_space(self):
+        # We're not charged for any per-object overheads in supported cloud services, so
+        # total object data sizes are what we're interested in for statistics and accounting.
+        return self.get_size()
+
+    def unlink(self):
+        self._discard()
+        return delete_chunks(self._container, self._key)
+
+    def _get_path(self):
+        """
+        When used with the mock cloud container, this returns the path of the file containing
+        the first chunk. For a real cloud container, it raises an error.
+        """
+        # It is OK that _get_path doesn't exist on real cloud container objects.
+        return self._container._get_path(self._key)
+
+
+class CloudShareReaderMixin:
+    """
+    Attributes:
+      _data_length: (integer) length of data excluding headers and leases
+      _chunksize:   (integer) size of each chunk possibly excluding the last
+      _cache:       (ChunkCache) the cache used to read chunks
+
+      DATA_OFFSET:  (integer) offset to the start-of-data from start of the sharefile
+    """
+    def readv(self, readv):
+        sorted_readv = sorted(zip(readv, xrange(len(readv))))
+        datav = [None]*len(readv)
+        for (v, i) in sorted_readv:
+            (offset, length) = v
+            datav[i] = self.read_share_data(offset, length)
+        return gatherResults(datav)
+
+    def read_share_data(self, offset, length):
+        precondition(offset >= 0)
+
+        # Reads beyond the end of the data are truncated.
+        # Reads that start beyond the end of the data return an empty string.
+        seekpos = self.DATA_OFFSET + offset
+        actuallength = max(0, min(length, self._data_length - offset))
+        if actuallength == 0:
+            return defer.succeed("")
+
+        lastpos = seekpos + actuallength - 1
+        _assert(lastpos > 0, seekpos=seekpos, actuallength=actuallength, lastpos=lastpos)
+        start_chunknum = seekpos / self._chunksize
+        start_offset   = seekpos % self._chunksize
+        last_chunknum  = lastpos / self._chunksize
+        last_offset    = lastpos % self._chunksize
+        _assert(start_chunknum <= last_chunknum, start_chunknum=start_chunknum, last_chunknum=last_chunknum)
+
+        parts = deque()
+
+        def _load_part(ign, chunknum):
+            # determine which part of this chunk we need
+            start = 0
+            end = self._chunksize
+            if chunknum == start_chunknum:
+                start = start_offset
+            if chunknum == last_chunknum:
+                end = last_offset + 1
+            #print "LOAD", get_chunk_key(self._key, chunknum), start, end
+
+            # d2 fires when we should continue loading the next chunk; chunkdata_d fires with the actual data.
+            chunkdata_d = defer.Deferred()
+            d2 = self._cache.get(chunknum, chunkdata_d)
+            if start > 0 or end < self._chunksize:
+                chunkdata_d.addCallback(lambda chunkdata: chunkdata[start : end])
+            parts.append(chunkdata_d)
+            return d2
+
+        d = defer.succeed(None)
+        for i in xrange(start_chunknum, last_chunknum + 1):
+            d.addCallback(_load_part, i)
+        d.addCallback(lambda ign: gatherResults(parts))
+        d.addCallback(lambda pieces: ''.join(pieces))
+        return d
+
+
+class CloudError(Exception):
+    pass
+
+
+BACKOFF_SECONDS_FOR_5XX = (0, 2, 10)
+
+
+class ContainerRetryMixin:
+    """
+    I provide a helper method for performing an operation on a cloud container that will retry up to
+    len(BACKOFF_SECONDS_FOR_5XX) times (not including the initial try). If the initial try fails, a
+    single incident will be triggered after the operation has succeeded or failed.
+    """
+
+    def _do_request(self, description, operation, *args, **kwargs):
+        d = defer.maybeDeferred(operation, *args, **kwargs)
+        def _retry(f):
+            d2 = self._handle_error(f, 1, None, description, operation, *args, **kwargs)
+            def _trigger_incident(res):
+                log.msg(format="error(s) on cloud container operation: %(description)s %(arguments)s %(kwargs)s",
+                        arguments=args[:2], kwargs=kwargs, description=description,
+                        level=log.WEIRD)
+                return res
+            d2.addBoth(_trigger_incident)
+            return d2
+        d.addErrback(_retry)
+        return d
+
+    def _handle_error(self, f, trynum, first_err_and_tb, description, operation, *args, **kwargs):
+        f.trap(self.ServiceError)
+
+        # Don't use f.getTracebackObject() since a fake traceback will not do for the 3-arg form of 'raise'.
+        # tb can be None (which is acceptable for 3-arg raise) if we don't have a traceback.
+        tb = getattr(f, 'tb', None)
+        fargs = f.value.args
+        if len(fargs) > 2 and fargs[2] and '<code>signaturedoesnotmatch</code>' in fargs[2].lower():
+            fargs = fargs[:2] + ("SignatureDoesNotMatch response redacted",) + fargs[3:]
+
+        args_without_data = args[:2]
+        msg = "try %d failed: %s %s %s" % (trynum, description, args_without_data, kwargs)
+        err = CloudError(msg, *fargs)
+
+        # This should not trigger an incident; we want to do that at the end.
+        log.msg(format="try %(trynum)d failed: %(description)s %(arguments)s %(kwargs)s %(fargs)s",
+                trynum=trynum, arguments=args_without_data, kwargs=kwargs, description=description, fargs=repr(fargs),
+                level=log.INFREQUENT)
+
+        if first_err_and_tb is None:
+            first_err_and_tb = (err, tb)
+
+        if trynum > len(BACKOFF_SECONDS_FOR_5XX):
+            # If we run out of tries, raise the error we got on the first try (which *may* have
+            # a more useful traceback).
+            (first_err, first_tb) = first_err_and_tb
+            raise first_err.__class__, first_err, first_tb
+
+        fargs = f.value.args
+        if len(fargs) > 0 and int(fargs[0]) >= 500 and int(fargs[0]) < 600:
+            # Retry on 5xx errors.
+            d = task.deferLater(reactor, BACKOFF_SECONDS_FOR_5XX[trynum-1], operation, *args, **kwargs)
+            d.addErrback(self._handle_error, trynum+1, first_err_and_tb, description, operation, *args, **kwargs)
+            return d
+
+        # If we get an error response other than a 5xx, raise that error even if it was on a retry.
+        raise err.__class__, err, tb
+
+
+def concat(seqs):
+    """
+    O(n), rather than O(n^2), concatenation of list-like things, returning a list.
+    I can't believe this isn't built in.
+    """
+    total_len = 0
+    for seq in seqs:
+        total_len += len(seq)
+    result = [None]*total_len
+    i = 0
+    for seq in seqs:
+        for x in seq:
+            result[i] = x
+            i += 1
+    _assert(i == total_len, i=i, total_len=total_len)
+    return result
+
+
+class ContainerListMixin:
+    """
+    S3 has a limitation of 1000 object entries returned on each list (GET Bucket) request.
+    I provide a helper method to repeat the call as many times as necessary to get a full
+    listing. The container is assumed to implement:
+
+    def list_some_objects(self, **kwargs):
+        # kwargs may include 'prefix' and 'marker' parameters as documented at
+        # <http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTBucketGET.html>.
+        # returns Deferred ContainerListing
+
+    Note that list_some_objects is assumed to be reliable; so, if retries are needed,
+    the container class should also inherit from ContainerRetryMixin and list_some_objects
+    should make the request via _do_request.
+
+    The 'delimiter' parameter of the GET Bucket API is not supported.
+    """
+    def list_objects(self, prefix=''):
+        kwargs = {'prefix': prefix}
+        all_contents = deque()
+        def _list_some():
+            d2 = self.list_some_objects(**kwargs)
+            def _got_listing(res):
+                all_contents.append(res.contents)
+                if res.is_truncated == "true":
+                    _assert(len(res.contents) > 0)
+                    marker = res.contents[-1].key
+                    _assert('marker' not in kwargs or marker > kwargs['marker'],
+                            "Not making progress in list_objects", kwargs=kwargs, marker=marker)
+                    kwargs['marker'] = marker
+                    return _list_some()
+                else:
+                    _assert(res.is_truncated == "false", is_truncated=res.is_truncated)
+                    return res
+            d2.addCallback(_got_listing)
+            return d2
+
+        d = _list_some()
+        d.addCallback(lambda res: res.__class__(res.name, res.prefix, res.marker, res.max_keys,
+                                                "false", concat(all_contents)))
+        def _log(f):
+            log.msg(f, level=log.WEIRD)
+            return f
+        d.addErrback(_log)
+        return d
+
+
+class BackpressurePipeline(object):
+    """
+    I manage a pipeline of Deferred operations that allows the data source to feel backpressure
+    when the pipeline is "full". I do not actually limit the number of operations in progress.
+    """
+    OPEN = 0
+    CLOSING = 1
+    CLOSED = 2
+
+    def __init__(self, capacity):
+        self._capacity = capacity  # how full we can be before causing calls to 'add' to block
+        self._gauge = 0            # how full we are
+        self._waiting = []         # callers of add() who are blocked
+        self._unfinished = 0       # number of pending operations
+        self._result_d = defer.Deferred()
+        self._state = self.OPEN
+
+    def add(self, _size, _func, *args, **kwargs):
+        if self._state == self.CLOSED:
+            msg = "add() called on closed BackpressurePipeline"
+            log.err(msg, level=log.WEIRD)
+            def _already_closed(): raise AssertionError(msg)
+            return defer.execute(_already_closed)
+        self._gauge += _size
+        self._unfinished += 1
+        fd = defer.maybeDeferred(_func, *args, **kwargs)
+        fd.addBoth(self._call_finished, _size)
+        fd.addErrback(log.err, "BackpressurePipeline._call_finished raised an exception")
+        if self._gauge < self._capacity:
+            return defer.succeed(None)
+        d = defer.Deferred()
+        self._waiting.append(d)
+        return d
+
+    def fail(self, f):
+        if self._state != self.CLOSED:
+            self._state = self.CLOSED
+            eventually_errback(self._result_d)(f)
+
+    def flush(self):
+        if self._state == self.CLOSED:
+            return defer.succeed(self._result_d)
+
+        d = self.close()
+        d.addBoth(self.reopen)
+        return d
+
+    def close(self):
+        if self._state != self.CLOSED:
+            if self._unfinished == 0:
+                self._state = self.CLOSED
+                eventually_callback(self._result_d)(None)
+            else:
+                self._state = self.CLOSING
+        return self._result_d
+
+    def reopen(self, res=None):
+        _assert(self._state == self.CLOSED, state=self._state)
+        self._result_d = defer.Deferred()
+        self._state = self.OPEN
+        return res
+
+    def _call_finished(self, res, size):
+        self._unfinished -= 1
+        self._gauge -= size
+        if isinstance(res, Failure):
+            self.fail(res)
+
+        if self._state == self.CLOSING:
+            # repeat the unfinished == 0 check
+            self.close()
+
+        if self._state == self.CLOSED:
+            while self._waiting:
+                d = self._waiting.pop(0)
+                eventual_chain(self._result_d, d)
+        elif self._gauge < self._capacity:
+            while self._waiting:
+                d = self._waiting.pop(0)
+                eventually_callback(d)(None)
+        return None
+
+
+class ChunkCache(object):
+    """I cache chunks for a specific share object."""
+
+    def __init__(self, container, key, chunksize, nchunks=1, initial_cachemap={}):
+        self._container = container
+        self._key = key
+        self._chunksize = chunksize
+        self._nchunks = nchunks
+
+        # chunknum -> deferred data
+        self._cachemap = initial_cachemap
+        self._pipeline = BackpressurePipeline(PIPELINE_DEPTH)
+
+    def set_nchunks(self, nchunks):
+        self._nchunks = nchunks
+
+    def _load_chunk(self, chunknum, chunkdata_d):
+        d = self._container.get_object(get_chunk_key(self._key, chunknum))
+        eventual_chain(source=d, target=chunkdata_d)
+        return d
+
+    def get(self, chunknum, result_d):
+        if chunknum in self._cachemap:
+            # cache hit; never stall
+            eventual_chain(source=self._cachemap[chunknum], target=result_d)
+            return defer.succeed(None)
+
+        # Evict any chunks other than the first and last two, until there are
+        # three or fewer chunks left cached.
+        for candidate_chunknum in self._cachemap.keys():
+            if len(self._cachemap) <= 3:
+                break
+            if candidate_chunknum not in (0, self._nchunks-2, self._nchunks-1):
+                self.flush_chunk(candidate_chunknum)
+
+        # cache miss; stall when the pipeline is full
+        chunkdata_d = defer.Deferred()
+        d = self._pipeline.add(1, self._load_chunk, chunknum, chunkdata_d)
+        def _check(res):
+            _assert(res is not None)
+            return res
+        chunkdata_d.addCallback(_check)
+        self._cachemap[chunknum] = chunkdata_d
+        eventual_chain(source=chunkdata_d, target=result_d)
+        return d
+
+    def flush_chunk(self, chunknum):
+        if chunknum in self._cachemap:
+            del self._cachemap[chunknum]
+
+    def close(self):
+        self._cachemap = None
+        return self._pipeline.close()
diff --git a/src/allmydata/storage/backends/cloud/immutable.py b/src/allmydata/storage/backends/cloud/immutable.py
new file mode 100644 (file)
index 0000000..cc29298
--- /dev/null
@@ -0,0 +1,188 @@
+
+import struct
+
+from cStringIO import StringIO
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IShareForReading, IShareForWriting
+
+from allmydata.util.assertutil import precondition, _assert
+from allmydata.util.mathutil import div_ceil
+from allmydata.storage.common import UnknownImmutableContainerVersionError, DataTooLargeError
+from allmydata.storage.backends.cloud import cloud_common
+from allmydata.storage.backends.cloud.cloud_common import get_chunk_key, \
+     BackpressurePipeline, ChunkCache, CloudShareBase, CloudShareReaderMixin
+
+
+# Each share file (stored in the chunks with keys 'shares/$PREFIX/$STORAGEINDEX/$SHNUM.$CHUNK')
+# contains lease information [currently inaccessible] and share data. The share data is
+# accessed by RIBucketWriter.write and RIBucketReader.read .
+
+# The share file has the following layout:
+#  0x00: share file version number, four bytes, current version is 1
+#  0x04: always zero (was share data length prior to Tahoe-LAFS v1.3.0)
+#  0x08: number of leases, four bytes big-endian
+#  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
+#  data_length + 0x0c: first lease. Each lease record is 72 bytes. (not used)
+
+
+class ImmutableCloudShareMixin:
+    sharetype = "immutable"
+    LEASE_SIZE = struct.calcsize(">L32s32sL")  # for compatibility
+    HEADER = ">LLL"
+    HEADER_SIZE = struct.calcsize(HEADER)
+    DATA_OFFSET = HEADER_SIZE
+
+
+class ImmutableCloudShareForWriting(CloudShareBase, ImmutableCloudShareMixin):
+    implements(IShareForWriting)
+
+    def __init__(self, container, storage_index, shnum, allocated_data_length, incomingset):
+        """
+        I won't allow more than allocated_data_length to be written to me.
+        """
+        precondition(isinstance(allocated_data_length, (int, long)), allocated_data_length)
+        CloudShareBase.__init__(self, container, storage_index, shnum)
+
+        self._chunksize = cloud_common.PREFERRED_CHUNK_SIZE
+        self._allocated_data_length = allocated_data_length
+
+        self._buf = StringIO()
+        # The second field, which was the four-byte share data length in
+        # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0.
+        # We also write 0 for the number of leases.
+        self._buf.write(struct.pack(self.HEADER, 1, 0, 0) )
+        self._set_size(self._buf.tell())
+        self._current_chunknum = 0
+
+        self._incomingset = incomingset
+        self._incomingset.add( (storage_index, shnum) )
+
+        self._pipeline = BackpressurePipeline(cloud_common.PIPELINE_DEPTH)
+
+    def _set_size(self, size):
+        self._total_size = size
+        self._data_length = size - self.DATA_OFFSET  # no leases
+
+    def get_allocated_data_length(self):
+        return self._allocated_data_length
+
+    def write_share_data(self, offset, data):
+        """Write 'data' at position 'offset' past the end of the header."""
+        seekpos = self.DATA_OFFSET + offset
+        precondition(seekpos >= self._total_size, offset=offset, seekpos=seekpos, total_size=self._total_size)
+        if offset + len(data) > self._allocated_data_length:
+            raise DataTooLargeError(self._allocated_data_length, offset, len(data))
+
+        self._set_size(self._total_size + len(data))
+        return self._store_or_buffer( (seekpos, data, 0) )
+
+    def close(self):
+        chunkdata = self._buf.getvalue()
+        self._discard()
+        d = self._pipeline_store_next_chunk(chunkdata)
+        d.addCallback(lambda ign: self._pipeline.close())
+        return d
+
+    def _store_or_buffer(self, (seekpos, b, b_offset) ):
+        """
+        Helper method that stores the next complete chunk to the container or buffers
+        an incomplete chunk. The data still to be written is b[b_offset:], but we may
+        only process part of it in this call.
+        """
+        chunknum = seekpos / self._chunksize
+        offset_in_chunk = seekpos % self._chunksize
+
+        _assert(chunknum >= self._current_chunknum, seekpos=seekpos, chunknum=chunknum,
+                current_chunknum=self._current_chunknum)
+
+        if chunknum > self._current_chunknum or offset_in_chunk + (len(b) - b_offset) >= self._chunksize:
+            if chunknum > self._current_chunknum:
+                # The write left a gap that spans a chunk boundary. Fill with zeroes to the end
+                # of the current chunk and store it.
+                # TODO: test this case
+                self._buf.seek(self._chunksize - 1)
+                self._buf.write("\x00")
+            else:
+                # Store a complete chunk.
+                writelen = self._chunksize - offset_in_chunk
+                self._buf.seek(offset_in_chunk)
+                self._buf.write(b[b_offset : b_offset + writelen])
+                seekpos += writelen
+                b_offset += writelen
+
+            chunkdata = self._buf.getvalue()
+            self._buf = StringIO()
+            _assert(len(chunkdata) == self._chunksize, len_chunkdata=len(chunkdata), chunksize=self._chunksize)
+
+            d2 = self._pipeline_store_next_chunk(chunkdata)
+            d2.addCallback(lambda ign: self._store_or_buffer( (seekpos, b, b_offset) ))
+            return d2
+        else:
+            # Buffer an incomplete chunk.
+            if b_offset > 0:
+                b = b[b_offset :]
+            self._buf.seek(offset_in_chunk)
+            self._buf.write(b)
+            return defer.succeed(None)
+
+    def _pipeline_store_next_chunk(self, chunkdata):
+        chunkkey = get_chunk_key(self._key, self._current_chunknum)
+        self._current_chunknum += 1
+        #print "STORING", chunkkey, len(chunkdata)
+
+        # We'd like to stream writes, but the supported service containers
+        # (and the IContainer interface) don't support that yet. For txaws, see
+        # https://bugs.launchpad.net/txaws/+bug/767205 and
+        # https://bugs.launchpad.net/txaws/+bug/783801
+        return self._pipeline.add(1, self._container.put_object, chunkkey, chunkdata)
+
+    def _discard(self):
+        self._buf = None
+        self._incomingset.discard( (self.get_storage_index(), self.get_shnum()) )
+
+
+class ImmutableCloudShareForReading(CloudShareBase, ImmutableCloudShareMixin, CloudShareReaderMixin):
+    implements(IShareForReading)
+
+    def __init__(self, container, storage_index, shnum, total_size, first_chunkdata):
+        CloudShareBase.__init__(self, container, storage_index, shnum)
+
+        precondition(isinstance(total_size, (int, long)), total_size=total_size)
+        precondition(isinstance(first_chunkdata, str), type(first_chunkdata))
+        precondition(len(first_chunkdata) <= total_size, len_first_chunkdata=len(first_chunkdata), total_size=total_size)
+
+        chunksize = len(first_chunkdata)
+        if chunksize < self.HEADER_SIZE:
+            msg = "%r had incomplete header (%d bytes)" % (self, chunksize)
+            raise UnknownImmutableContainerVersionError(msg)
+
+        self._total_size = total_size
+        self._chunksize = chunksize
+        nchunks = div_ceil(total_size, chunksize)
+        initial_cachemap = {0: defer.succeed(first_chunkdata)}
+        self._cache = ChunkCache(container, self._key, chunksize, nchunks, initial_cachemap)
+        #print "ImmutableCloudShareForReading", total_size, chunksize, self._key
+
+        header = first_chunkdata[:self.HEADER_SIZE]
+        (version, unused, num_leases) = struct.unpack(self.HEADER, header)
+
+        if version != 1:
+            msg = "%r had version %d but we wanted 1" % (self, version)
+            raise UnknownImmutableContainerVersionError(msg)
+
+        # We cannot write leases in share files, but allow them to be present
+        # in case a share file is copied from a disk backend, or in case we
+        # need them in future.
+        self._data_length = total_size - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE)
+
+        # TODO: raise a better exception.
+        _assert(self._data_length >= 0, data_length=self._data_length)
+
+    # Boilerplate is in CloudShareBase, read implementation is in CloudShareReaderMixin.
+    # So nothing to implement here. Yay!
+
+    def _discard(self):
+        pass
diff --git a/src/allmydata/storage/backends/cloud/mock_cloud.py b/src/allmydata/storage/backends/cloud/mock_cloud.py
new file mode 100644 (file)
index 0000000..7367762
--- /dev/null
@@ -0,0 +1,231 @@
+
+import os.path
+
+from twisted.internet import defer
+from twisted.web.error import Error
+from allmydata.util.deferredutil import async_iterate
+
+from zope.interface import implements
+
+from allmydata.storage.backends.cloud.cloud_common import IContainer, \
+     ContainerRetryMixin, ContainerListMixin
+from allmydata.util.time_format import iso_utc
+from allmydata.util import fileutil
+
+
+MAX_KEYS = 1000
+
+
+def configure_mock_cloud_backend(storedir, config):
+    from allmydata.storage.backends.cloud.cloud_backend import CloudBackend
+
+    container = MockContainer(storedir)
+    return CloudBackend(container)
+
+
+class MockContainer(ContainerRetryMixin, ContainerListMixin):
+    implements(IContainer)
+    """
+    I represent a mock cloud container that stores its data in the local filesystem.
+    I also keep track of the number of loads and stores.
+    """
+
+    def __init__(self, storagedir):
+        self._storagedir = storagedir
+        self.container_name = "MockContainer"
+        self.ServiceError = MockServiceError
+        self._load_count = 0
+        self._store_count = 0
+
+        fileutil.make_dirs(os.path.join(self._storagedir, "shares"))
+
+    def __repr__(self):
+        return ("<%s at %r>" % (self.__class__.__name__, self._storagedir,))
+
+    def _create(self):
+        return defer.execute(self._not_implemented)
+
+    def _delete(self):
+        return defer.execute(self._not_implemented)
+
+    def _iterate_dirs(self):
+        shares_dir = os.path.join(self._storagedir, "shares")
+        for prefixstr in sorted(fileutil.listdir(shares_dir)):
+            prefixkey = "shares/%s" % (prefixstr,)
+            prefixdir = os.path.join(shares_dir, prefixstr)
+            for sistr in sorted(fileutil.listdir(prefixdir)):
+                sikey = "%s/%s" % (prefixkey, sistr)
+                sidir = os.path.join(prefixdir, sistr)
+                for shnumstr in sorted(fileutil.listdir(sidir)):
+                    sharefile = os.path.join(sidir, shnumstr)
+                    yield (sharefile, "%s/%s" % (sikey, shnumstr))
+
+    def _list_some_objects(self, ign, prefix='', marker=None, max_keys=None):
+        if max_keys is None:
+            max_keys = MAX_KEYS
+        contents = []
+        def _next_share(res):
+            if res is None:
+                return
+            (sharefile, sharekey) = res
+            # note that all strings are > None
+            if sharekey.startswith(prefix) and sharekey > marker:
+                stat_result = os.stat(sharefile)
+                mtime_utc = iso_utc(stat_result.st_mtime, sep=' ')+'+00:00'
+                item = ContainerItem(key=sharekey, modification_date=mtime_utc, etag="",
+                                     size=stat_result.st_size, storage_class="STANDARD")
+                contents.append(item)
+            return len(contents) < max_keys
+
+        d = async_iterate(_next_share, self._iterate_dirs())
+        def _done(completed):
+            contents.sort(key=lambda item: item.key)
+            return ContainerListing(self.container_name, '', '', max_keys,
+                                    is_truncated=str(not completed).lower(), contents=contents)
+        d.addCallback(_done)
+        return d
+
+    def _get_path(self, object_name, must_exist=False):
+        # This method is also called by tests.
+        sharefile = os.path.join(self._storagedir, object_name)
+        if must_exist and not os.path.exists(sharefile):
+            raise MockServiceError("", 404, "not found")
+        return sharefile
+
+    def _put_object(self, ign, object_name, data, content_type, metadata):
+        assert content_type is None, content_type
+        assert metadata == {}, metadata
+        sharefile = self._get_path(object_name)
+        fileutil.make_dirs(os.path.dirname(sharefile))
+        fileutil.write(sharefile, data)
+        self._store_count += 1
+        return defer.succeed(None)
+
+    def _get_object(self, ign, object_name):
+        self._load_count += 1
+        data = fileutil.read(self._get_path(object_name, must_exist=True))
+        return defer.succeed(data)
+
+    def _head_object(self, ign, object_name):
+        return defer.execute(self._not_implemented)
+
+    def _delete_object(self, ign, object_name):
+        fileutil.remove(self._get_path(object_name, must_exist=True))
+        return defer.succeed(None)
+
+    def _not_implemented(self):
+        raise NotImplementedError
+
+    # methods that use error handling from ContainerRetryMixin
+
+    def create(self):
+        return self._do_request('create bucket', self._create, self.container_name)
+
+    def delete(self):
+        return self._do_request('delete bucket', self._delete, self.container_name)
+
+    def list_some_objects(self, **kwargs):
+        return self._do_request('list objects', self._list_some_objects, self.container_name, **kwargs)
+
+    def put_object(self, object_name, data, content_type=None, metadata={}):
+        return self._do_request('PUT object', self._put_object, self.container_name, object_name,
+                                data, content_type, metadata)
+
+    def get_object(self, object_name):
+        return self._do_request('GET object', self._get_object, self.container_name, object_name)
+
+    def head_object(self, object_name):
+        return self._do_request('HEAD object', self._head_object, self.container_name, object_name)
+
+    def delete_object(self, object_name):
+        return self._do_request('DELETE object', self._delete_object, self.container_name, object_name)
+
+    def reset_load_store_counts(self):
+        self._load_count = 0
+        self._store_count = 0
+
+    def get_load_count(self):
+        return self._load_count
+
+    def get_store_count(self):
+        return self._store_count
+
+
+class MockServiceError(Error):
+    """
+    A error class similar to txaws' S3Error.
+    """
+    def __init__(self, xml_bytes, status, message=None, response=None, request_id="", host_id=""):
+        Error.__init__(self, status, message, response)
+        self.original = xml_bytes
+        self.status = str(status)
+        self.message = str(message)
+        self.request_id = request_id
+        self.host_id = host_id
+
+    def get_error_code(self):
+        return self.status
+
+    def get_error_message(self):
+        return self.message
+
+    def parse(self, xml_bytes=""):
+        raise NotImplementedError
+
+    def has_error(self, errorString):
+        raise NotImplementedError
+
+    def get_error_codes(self):
+        raise NotImplementedError
+
+    def get_error_messages(self):
+        raise NotImplementedError
+
+
+# Originally from txaws.s3.model (under different class names), which was under the MIT / Expat licence.
+
+class ContainerItem(object):
+    """
+    An item in a listing of cloud objects.
+    """
+    def __init__(self, key, modification_date, etag, size, storage_class,
+                 owner=None):
+        self.key = key
+        self.modification_date = modification_date
+        self.etag = etag
+        self.size = size
+        self.storage_class = storage_class
+        self.owner = owner
+
+    def __repr__(self):
+        return "<ContainerItem %r>" % ({
+                   "key": self.key,
+                   "modification_date": self.modification_date,
+                   "etag": self.etag,
+                   "size": self.size,
+                   "storage_class": self.storage_class,
+                   "owner": self.owner,
+               },)
+
+
+class ContainerListing(object):
+    def __init__(self, name, prefix, marker, max_keys, is_truncated,
+                 contents=None, common_prefixes=None):
+        self.name = name
+        self.prefix = prefix
+        self.marker = marker
+        self.max_keys = max_keys
+        self.is_truncated = is_truncated
+        self.contents = contents
+        self.common_prefixes = common_prefixes
+
+    def __repr__(self):
+        return "<ContainerListing %r>" % ({
+                   "name": self.name,
+                   "prefix": self.prefix,
+                   "marker": self.marker,
+                   "max_keys": self.max_keys,
+                   "is_truncated": self.is_truncated,
+                   "contents": self.contents,
+                   "common_prefixes": self.common_prefixes,
+               })
diff --git a/src/allmydata/storage/backends/cloud/mutable.py b/src/allmydata/storage/backends/cloud/mutable.py
new file mode 100644 (file)
index 0000000..f8ad096
--- /dev/null
@@ -0,0 +1,470 @@
+
+import struct
+from collections import deque
+
+from twisted.internet import defer
+from allmydata.util.deferredutil import gatherResults, async_iterate
+
+from zope.interface import implements
+
+from allmydata.interfaces import IMutableShare, BadWriteEnablerError
+from allmydata.util import idlib, log
+from allmydata.util.assertutil import precondition, _assert
+from allmydata.util.mathutil import div_ceil
+from allmydata.util.hashutil import timing_safe_compare
+from allmydata.storage.common import UnknownMutableContainerVersionError, DataTooLargeError
+from allmydata.storage.backends.base import testv_compare
+from allmydata.mutable.layout import MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
+from allmydata.storage.backends.cloud import cloud_common
+from allmydata.storage.backends.cloud.cloud_common import get_chunk_key, get_zero_chunkdata, \
+     delete_chunks, BackpressurePipeline, ChunkCache, CloudShareBase, CloudShareReaderMixin
+
+
+# Mutable shares have a different layout to immutable shares. See docs/mutable.rst
+# for more details.
+
+# #   offset    size    name
+# 1   0         32      magic verstr "tahoe mutable container v1" plus binary
+# 2   32        20      write enabler's nodeid
+# 3   52        32      write enabler
+# 4   84        8       data size (actual share data present) (a)
+# 5   92        8       offset of (8) count of extra leases (after data)
+# 6   100       368     four leases, 92 bytes each, unused
+# 7   468       (a)     data
+# 8   ??        4       count of extra leases
+# 9   ??        n*92    extra leases
+
+
+# The struct module doc says that L's are 4 bytes in size, and that Q's are
+# 8 bytes in size. Since compatibility depends upon this, double-check it.
+assert struct.calcsize(">L") == 4, struct.calcsize(">L")
+assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
+
+
+class Namespace(object):
+    pass
+
+
+class MutableCloudShare(CloudShareBase, CloudShareReaderMixin):
+    implements(IMutableShare)
+
+    sharetype = "mutable"
+    DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
+    EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
+    HEADER = ">32s20s32sQQ"
+    HEADER_SIZE = struct.calcsize(HEADER) # doesn't include leases
+    LEASE_SIZE = struct.calcsize(">LL32s32s20s")
+    assert LEASE_SIZE == 92, LEASE_SIZE
+    DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
+    assert DATA_OFFSET == 468, DATA_OFFSET
+    NUM_EXTRA_LEASES_SIZE = struct.calcsize(">L")
+
+    MAGIC = MUTABLE_MAGIC
+    assert len(MAGIC) == 32
+    MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
+
+    def __init__(self, container, storage_index, shnum, total_size, first_chunkdata, parent=None):
+        CloudShareBase.__init__(self, container, storage_index, shnum)
+
+        precondition(isinstance(total_size, (int, long)), total_size=total_size)
+        precondition(isinstance(first_chunkdata, str), type(first_chunkdata))
+        precondition(len(first_chunkdata) <= total_size, "total size is smaller than first chunk",
+                     len_first_chunkdata=len(first_chunkdata), total_size=total_size)
+
+        if len(first_chunkdata) < self.HEADER_SIZE:
+            msg = "%r had incomplete header (%d bytes)" % (self, len(first_chunkdata))
+            raise UnknownMutableContainerVersionError(msg)
+
+        header = first_chunkdata[:self.HEADER_SIZE]
+        (magic, write_enabler_nodeid, real_write_enabler,
+         data_length, extra_lease_offset) = struct.unpack(self.HEADER, header)
+
+        if magic != self.MAGIC:
+            msg = "%r had magic %r but we wanted %r" % (self, magic, self.MAGIC)
+            raise UnknownMutableContainerVersionError(msg)
+
+        self._write_enabler_nodeid = write_enabler_nodeid
+        self._real_write_enabler = real_write_enabler
+
+        # We want to support changing PREFERRED_CHUNK_SIZE without breaking compatibility,
+        # but without "rechunking" any existing shares. Also, existing shares created by
+        # the pre-chunking code should be handled correctly.
+
+        # If there is more than one chunk, the chunksize must be equal to the size of the
+        # first chunk, to avoid rechunking.
+        self._chunksize = len(first_chunkdata)
+        if self._chunksize == total_size:
+            # There is only one chunk, so we are at liberty to make the chunksize larger
+            # than that chunk, but not smaller.
+            self._chunksize = max(self._chunksize, cloud_common.PREFERRED_CHUNK_SIZE)
+
+        self._zero_chunkdata = get_zero_chunkdata(self._chunksize)
+
+        initial_cachemap = {0: defer.succeed(first_chunkdata)}
+        self._cache = ChunkCache(container, self._key, self._chunksize, initial_cachemap=initial_cachemap)
+        #print "CONSTRUCT %s with %r" % (object.__repr__(self), self._cache)
+        self._data_length = data_length
+        self._set_total_size(self.DATA_OFFSET + data_length + self.NUM_EXTRA_LEASES_SIZE)
+
+        # The initial total size may not be less than the size of header + data + extra lease count.
+        # TODO: raise a better exception.
+        _assert(total_size >= self._total_size, share=repr(self),
+                total_size=total_size, self_total_size=self._total_size, data_length=data_length)
+        self._is_oversize = total_size > self._total_size
+
+        self._pipeline = BackpressurePipeline(cloud_common.PIPELINE_DEPTH)
+
+        self.parent = parent # for logging
+
+    def _set_total_size(self, total_size):
+        self._total_size = total_size
+        self._nchunks = div_ceil(self._total_size, self._chunksize)
+        self._cache.set_nchunks(self._nchunks)
+
+    def log(self, *args, **kwargs):
+        if self.parent:
+            return self.parent.log(*args, **kwargs)
+
+    @classmethod
+    def create_empty_share(cls, container, serverid, write_enabler, storage_index=None, shnum=None, parent=None):
+        # Unlike the disk backend, we don't check that the cloud object does not exist;
+        # we assume that it does not because create was used, and no-one else should be
+        # writing to the bucket.
+
+        # There are no extra leases, but for compatibility, the offset they would have
+        # still needs to be stored in the header.
+        data_length = 0
+        extra_lease_offset = cls.DATA_OFFSET + data_length
+        header = struct.pack(cls.HEADER, cls.MAGIC, serverid, write_enabler,
+                             data_length, extra_lease_offset)
+        leases = "\x00"*(cls.LEASE_SIZE * 4)
+        extra_lease_count = struct.pack(">L", 0)
+        first_chunkdata = header + leases + extra_lease_count
+
+        share = cls(container, storage_index, shnum, len(first_chunkdata), first_chunkdata, parent=parent)
+
+        d = share._raw_writev(deque([(0, first_chunkdata)]), 0, 0)
+        d.addCallback(lambda ign: share)
+        return d
+
+    def _discard(self):
+        # TODO: discard read cache
+        pass
+
+    def check_write_enabler(self, write_enabler):
+        # avoid a timing attack
+        if not timing_safe_compare(write_enabler, self._real_write_enabler):
+            # accomodate share migration by reporting the nodeid used for the
+            # old write enabler.
+            self.log(format="bad write enabler on SI %(si)s,"
+                     " recorded by nodeid %(nodeid)s",
+                     facility="tahoe.storage",
+                     level=log.WEIRD, umid="DF2fCR",
+                     si=self.get_storage_index_string(),
+                     nodeid=idlib.nodeid_b2a(self._write_enabler_nodeid))
+            msg = "The write enabler was recorded by nodeid '%s'." % \
+                  (idlib.nodeid_b2a(self._write_enabler_nodeid),)
+            raise BadWriteEnablerError(msg)
+        return defer.succeed(None)
+
+    def check_testv(self, testv):
+        def _test( (offset, length, operator, specimen) ):
+            d = self.read_share_data(offset, length)
+            d.addCallback(lambda data: testv_compare(data, operator, specimen))
+            return d
+        return async_iterate(_test, sorted(testv))
+
+    def writev(self, datav, new_length):
+        precondition(new_length is None or new_length >= 0, new_length=new_length)
+
+        raw_datav, preserved_size, new_data_length = self._prepare_writev(datav, new_length)
+        return self._raw_writev(raw_datav, preserved_size, new_data_length)
+
+    def _prepare_writev(self, datav, new_length):
+        # Translate the client's write vector and 'new_length' into a "raw" write vector
+        # and new total size. This has no side effects to make it easier to test.
+
+        preserved_size = self.DATA_OFFSET + self._data_length
+
+        # chunk containing the byte after the current end-of-data
+        endofdata_chunknum = preserved_size / self._chunksize
+
+        # Whether we need to add a dummy write to zero-extend the end-of-data chunk.
+        ns = Namespace()
+        ns.need_zeroextend_write = preserved_size % self._chunksize != 0
+
+        raw_datav = deque()
+        def _add_write(seekpos, data):
+            #print "seekpos =", seekpos
+            raw_datav.append( (seekpos, data) )
+
+            lastpos = seekpos + len(data) - 1
+            start_chunknum = seekpos / self._chunksize
+            last_chunknum  = lastpos / self._chunksize
+            if start_chunknum <= endofdata_chunknum and endofdata_chunknum <= last_chunknum:
+                # If any of the client's writes overlaps the end-of-data chunk, we should not
+                # add the zero-extending dummy write.
+                ns.need_zeroextend_write = False
+
+        #print "need_zeroextend_write =", ns.need_zeroextend_write
+        new_data_length = self._data_length
+
+        # Validate the write vector and translate its offsets into seek positions from
+        # the start of the share.
+        for (offset, data) in datav:
+            length = len(data)
+            precondition(offset >= 0, offset=offset)
+            if offset + length > self.MAX_SIZE:
+                raise DataTooLargeError()
+
+            if new_length is not None and new_length < offset + length:
+                length = max(0, new_length - offset)
+                data = data[: length]
+
+            new_data_length = max(new_data_length, offset + length)
+            if length > 0:
+                _add_write(self.DATA_OFFSET + offset, data)
+
+        # new_length can only be used to truncate, not extend.
+        if new_length is not None:
+            new_data_length = min(new_length, new_data_length)
+
+        # If the data length has changed, include additional raw writes to the data length
+        # field in the header, and to the extra lease count field after the data.
+        #
+        # Also do this if there were extra leases (e.g. if this was a share copied from a
+        # disk backend), so that they will be deleted. If the size hasn't changed and there
+        # are no extra leases, we don't bother to ensure that the extra lease count field is
+        # zero; it is ignored anyway.
+        if new_data_length != self._data_length or self._is_oversize:
+            extra_lease_offset = self.DATA_OFFSET + new_data_length
+
+            # Don't preserve old data past the new end-of-data.
+            preserved_size = min(preserved_size, extra_lease_offset)
+
+            # These are disjoint with any ranges already in raw_datav.
+            _add_write(self.DATA_LENGTH_OFFSET, struct.pack(">Q", new_data_length))
+            _add_write(extra_lease_offset, struct.pack(">L", 0))
+
+        #print "need_zeroextend_write =", ns.need_zeroextend_write
+        # If the data length is being increased and there are no other writes to the
+        # current end-of-data chunk (including the two we just added), add a dummy write
+        # of one zero byte at the end of that chunk. This will cause that chunk to be
+        # zero-extended to the full chunk size, which would not otherwise happen.
+        if new_data_length > self._data_length and ns.need_zeroextend_write:
+            _add_write((endofdata_chunknum + 1)*self._chunksize - 1, "\x00")
+
+        # Sorting the writes simplifies things (and we need all the simplification we can get :-)
+        raw_datav = deque(sorted(raw_datav, key=lambda (offset, data): offset))
+
+        # Complain if write vector elements overlap, that's too hard in general.
+        (last_seekpos, last_data) = (0, "")
+        have_duplicates = False
+        for (i, (seekpos, data)) in enumerate(raw_datav):
+            # The MDMF publisher in 1.9.0 and 1.9.1 produces duplicated writes to the MDMF header.
+            # If this is an exactly duplicated write, skip it.
+            if seekpos == last_seekpos and data == last_data:
+                raw_datav[i] = None
+                have_duplicates = True
+            else:
+                last_endpos = last_seekpos + len(last_data)
+                _assert(seekpos >= last_endpos, "overlapping write vector elements",
+                        seekpos=seekpos, last_seekpos=last_seekpos, last_endpos=last_endpos)
+            (last_seekpos, last_data) = (seekpos, data)
+
+        if have_duplicates:
+            raw_datav.remove(None)
+
+        # Return a vector of writes to ranges in the share, the size of previous contents to
+        # be preserved, and the final data length.
+        return (raw_datav, preserved_size, new_data_length)
+
+    def _raw_writev(self, raw_datav, preserved_size, new_data_length):
+        #print "%r._raw_writev(%r, %r, %r)" % (self, raw_datav, preserved_size, new_data_length)
+
+        old_nchunks = self._nchunks
+
+        # The _total_size and _nchunks attributes are updated as each write is applied.
+        self._set_total_size(preserved_size)
+
+        final_size = self.DATA_OFFSET + new_data_length + self.NUM_EXTRA_LEASES_SIZE
+
+        d = self._raw_write_share_data(None, raw_datav, final_size)
+
+        def _resize(ign):
+            self._data_length = new_data_length
+            self._set_total_size(final_size)
+
+            if self._nchunks < old_nchunks or self._is_oversize:
+                self._is_oversize = False
+                #print "DELETING chunks from", self._nchunks
+                return delete_chunks(self._container, self._key, from_chunknum=self._nchunks)
+        d.addCallback(_resize)
+
+        d.addCallback(lambda ign: self._pipeline.flush())
+        return d
+
+    def _raw_write_share_data(self, ign, raw_datav, final_size):
+        """
+        raw_datav:  (deque of (integer, str)) the remaining raw write vector
+        final_size: (integer) the size the file will be after all writes in the writev
+        """
+        #print "%r._raw_write_share_data(%r, %r)" % (self, (seekpos, data), final_size)
+
+        precondition(final_size >= 0, final_size=final_size)
+
+        d = defer.succeed(None)
+        if not raw_datav:
+            return d
+
+        (seekpos, data) = raw_datav.popleft()
+        _assert(seekpos >= 0 and len(data) > 0, seekpos=seekpos, len_data=len(data),
+                len_raw_datav=len(raw_datav), final_size=final_size)
+
+        # We *may* need to read the start chunk and/or last chunk before rewriting them.
+        # (If they are the same chunk, that's fine, the cache will ensure we don't
+        # read the cloud object twice.)
+        lastpos = seekpos + len(data) - 1
+        _assert(lastpos > 0, seekpos=seekpos, len_data=len(data), lastpos=lastpos)
+        start_chunknum = seekpos / self._chunksize
+        start_chunkpos = start_chunknum*self._chunksize
+        start_offset   = seekpos % self._chunksize
+        last_chunknum  = lastpos / self._chunksize
+        last_chunkpos  = last_chunknum*self._chunksize
+        last_offset    = lastpos % self._chunksize
+        _assert(start_chunknum <= last_chunknum, start_chunknum=start_chunknum, last_chunknum=last_chunknum)
+
+        #print "lastpos         =", lastpos
+        #print "len(data)       =", len(data)
+        #print "start_chunknum  =", start_chunknum
+        #print "start_offset    =", start_offset
+        #print "last_chunknum   =", last_chunknum
+        #print "last_offset     =", last_offset
+        #print "_total_size     =", self._total_size
+        #print "_chunksize      =", self._chunksize
+        #print "_nchunks        =", self._nchunks
+
+        start_chunkdata_d = defer.Deferred()
+        last_chunkdata_d = defer.Deferred()
+
+        # Is the first byte of the start chunk preserved?
+        if start_chunknum*self._chunksize < self._total_size and start_offset > 0:
+            # Yes, so we need to read it first.
+            d.addCallback(lambda ign: self._cache.get(start_chunknum, start_chunkdata_d))
+        else:
+            start_chunkdata_d.callback("")
+
+        # Is any byte of the last chunk preserved?
+        if last_chunkpos < self._total_size and lastpos < min(self._total_size, last_chunkpos + self._chunksize) - 1:
+            # Yes, so we need to read it first.
+            d.addCallback(lambda ign: self._cache.get(last_chunknum, last_chunkdata_d))
+        else:
+            last_chunkdata_d.callback("")
+
+        d.addCallback(lambda ign: gatherResults( (start_chunkdata_d, last_chunkdata_d) ))
+        def _got( (start_chunkdata, last_chunkdata) ):
+            #print "start_chunkdata =", len(start_chunkdata), repr(start_chunkdata)
+            #print "last_chunkdata  =", len(last_chunkdata),  repr(last_chunkdata)
+            d2 = defer.succeed(None)
+
+            # Zero any chunks from self._nchunks (i.e. after the last currently valid chunk)
+            # to before the start chunk of the write.
+            for zero_chunknum in xrange(self._nchunks, start_chunknum):
+                d2.addCallback(self._pipeline_store_chunk, zero_chunknum, self._zero_chunkdata)
+
+            # start_chunkdata and last_chunkdata may need to be truncated and/or zero-extended.
+            start_preserved = max(0, min(len(start_chunkdata), self._total_size - start_chunkpos, start_offset))
+            last_preserved  = max(0, min(len(last_chunkdata), self._total_size - last_chunkpos))
+
+            start_chunkdata = (start_chunkdata[: start_preserved] +
+                               self._zero_chunkdata[: max(0, start_offset - start_preserved)] +
+                               data[: self._chunksize - start_offset])
+
+            # last_slice_len = len(last_chunkdata[last_offset + 1 : last_preserved])
+            last_slice_len  = max(0, last_preserved - (last_offset + 1))
+            last_chunksize  = min(final_size - last_chunkpos, self._chunksize)
+            last_chunkdata  = (last_chunkdata[last_offset + 1 : last_preserved] +
+                               self._zero_chunkdata[: max(0, last_chunksize - (last_offset + 1) - last_slice_len)])
+
+            # This loop eliminates redundant reads and writes, by merging the contents of writes
+            # after this one into last_chunkdata as far as possible. It ensures that we never need
+            # to read a chunk twice in the same writev (which is needed for correctness; see below).
+            while raw_datav:
+                # Does the next write start in the same chunk as this write ends (last_chunknum)?
+                (next_seekpos, next_chunkdata) = raw_datav[0]
+                next_start_chunknum = next_seekpos / self._chunksize
+                next_start_offset   = next_seekpos % self._chunksize
+                next_lastpos        = next_seekpos + len(next_chunkdata) - 1
+
+                if next_start_chunknum != last_chunknum:
+                    break
+
+                _assert(next_start_offset > last_offset,
+                        next_start_offset=next_start_offset, last_offset=last_offset)
+
+                # Cut next_chunkdata at the end of next_start_chunknum.
+                next_cutpos = (next_start_chunknum + 1)*self._chunksize
+                last_chunkdata = (last_chunkdata[: next_start_offset - (last_offset + 1)] +
+                                  next_chunkdata[: next_cutpos - next_seekpos] +
+                                  last_chunkdata[next_lastpos - lastpos :])
+
+                # Does the next write extend beyond that chunk?
+                if next_lastpos >= next_cutpos:
+                    # The part after the cut will be processed in the next call to _raw_write_share_data.
+                    raw_datav[0] = (next_cutpos, next_chunkdata[next_cutpos - next_seekpos :])
+                    break
+                else:
+                    # Discard the write that has already been processed.
+                    raw_datav.popleft()
+
+            # start_chunknum and last_chunknum are going to be written, so need to be flushed
+            # from the read cache in case the new contents are needed by a subsequent readv
+            # or writev. (Due to the 'while raw_datav' loop above, we won't need to read them
+            # again in *this* writev. That property is needed for correctness because we don't
+            # flush the write pipeline until the end of the writev.)
+
+            d2.addCallback(lambda ign: self._cache.flush_chunk(start_chunkdata))
+            d2.addCallback(lambda ign: self._cache.flush_chunk(last_chunkdata))
+
+            # Now do the current write.
+            if last_chunknum == start_chunknum:
+                d2.addCallback(self._pipeline_store_chunk, start_chunknum,
+                               start_chunkdata + last_chunkdata)
+            else:
+                d2.addCallback(self._pipeline_store_chunk, start_chunknum,
+                               start_chunkdata)
+
+                for middle_chunknum in xrange(start_chunknum + 1, last_chunknum):
+                    d2.addCallback(self._pipeline_store_chunk, middle_chunknum,
+                                   data[middle_chunknum*self._chunksize - seekpos
+                                         : (middle_chunknum + 1)*self._chunksize - seekpos])
+
+                d2.addCallback(self._pipeline_store_chunk, last_chunknum,
+                               data[last_chunkpos - seekpos :] + last_chunkdata)
+            return d2
+        d.addCallback(_got)
+        d.addCallback(self._raw_write_share_data, raw_datav, final_size)  # continue the iteration
+        return d
+
+    def _pipeline_store_chunk(self, ign, chunknum, chunkdata):
+        precondition(len(chunkdata) <= self._chunksize, len_chunkdata=len(chunkdata), chunksize=self._chunksize)
+
+        chunkkey = get_chunk_key(self._key, chunknum)
+        #print "STORING", chunkkey, len(chunkdata), repr(chunkdata)
+
+        endpos = chunknum*self._chunksize + len(chunkdata)
+        if endpos > self._total_size:
+            self._set_total_size(endpos)
+
+        # We'd like to stream writes, but the supported service containers
+        # (and the IContainer interface) don't support that yet. For txaws, see
+        # https://bugs.launchpad.net/txaws/+bug/767205 and
+        # https://bugs.launchpad.net/txaws/+bug/783801
+        return self._pipeline.add(1, self._container.put_object, chunkkey, chunkdata)
+
+    def close(self):
+        # FIXME: 'close' doesn't exist in IMutableShare
+        self._discard()
+        d = self._pipeline.close()
+        d.addCallback(lambda ign: self._cache.close())
+        return d
\ No newline at end of file
diff --git a/src/allmydata/storage/backends/cloud/s3/s3_container.py b/src/allmydata/storage/backends/cloud/s3/s3_container.py
new file mode 100644 (file)
index 0000000..d1d6712
--- /dev/null
@@ -0,0 +1,101 @@
+
+from zope.interface import implements
+
+from allmydata.node import InvalidValueError
+from allmydata.storage.backends.cloud.cloud_common import IContainer, \
+     ContainerRetryMixin, ContainerListMixin
+
+
+def configure_s3_container(storedir, config):
+    from allmydata.storage.backends.cloud.s3.s3_container import S3Container
+
+    accesskeyid = config.get_config("storage", "s3.access_key_id")
+    secretkey = config.get_or_create_private_config("s3secret")
+    usertoken = config.get_optional_private_config("s3usertoken")
+    producttoken = config.get_optional_private_config("s3producttoken")
+    if producttoken and not usertoken:
+        raise InvalidValueError("If private/s3producttoken is present, private/s3usertoken must also be present.")
+    url = config.get_config("storage", "s3.url", "http://s3.amazonaws.com")
+    container_name = config.get_config("storage", "s3.bucket")
+
+    return S3Container(accesskeyid, secretkey, url, container_name, usertoken, producttoken)
+
+
+class S3Container(ContainerRetryMixin, ContainerListMixin):
+    implements(IContainer)
+    """
+    I represent a real S3 container (bucket), accessed using the txaws library.
+    """
+
+    def __init__(self, access_key, secret_key, url, container_name, usertoken=None, producttoken=None):
+        # We only depend on txaws when this class is actually instantiated.
+        from txaws.credentials import AWSCredentials
+        from txaws.service import AWSServiceEndpoint
+        from txaws.s3.client import S3Client, Query
+        from txaws.s3.exception import S3Error
+
+        creds = AWSCredentials(access_key=access_key, secret_key=secret_key)
+        endpoint = AWSServiceEndpoint(uri=url)
+
+        query_factory = None
+        if usertoken is not None:
+            def make_query(*args, **kwargs):
+                amz_headers = kwargs.get("amz_headers", {})
+                if producttoken is not None:
+                    amz_headers["security-token"] = (usertoken, producttoken)
+                else:
+                    amz_headers["security-token"] = usertoken
+                kwargs["amz_headers"] = amz_headers
+
+                return Query(*args, **kwargs)
+            query_factory = make_query
+
+        self.client = S3Client(creds=creds, endpoint=endpoint, query_factory=query_factory)
+        self.container_name = container_name
+        self.ServiceError = S3Error
+
+    def __repr__(self):
+        return ("<%s %r>" % (self.__class__.__name__, self.container_name,))
+
+    def create(self):
+        return self._do_request('create bucket', self.client.create, self.container_name)
+
+    def delete(self):
+        return self._do_request('delete bucket', self.client.delete, self.container_name)
+
+    def list_some_objects(self, **kwargs):
+        return self._do_request('list objects', self.client.get_bucket, self.container_name, **kwargs)
+
+    def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
+        return self._do_request('PUT object', self.client.put_object, self.container_name,
+                                object_name, data, content_type, metadata)
+
+    def get_object(self, object_name):
+        return self._do_request('GET object', self.client.get_object, self.container_name, object_name)
+
+    def head_object(self, object_name):
+        return self._do_request('HEAD object', self.client.head_object, self.container_name, object_name)
+
+    def delete_object(self, object_name):
+        return self._do_request('DELETE object', self.client.delete_object, self.container_name, object_name)
+
+    def put_policy(self, policy):
+        """
+        Set access control policy on a bucket.
+        """
+        query = self.client.query_factory(
+            action='PUT', creds=self.client.creds, endpoint=self.client.endpoint,
+            bucket=self.container_name, object_name='?policy', data=policy)
+        return self._do_request('PUT policy', query.submit)
+
+    def get_policy(self):
+        query = self.client.query_factory(
+            action='GET', creds=self.client.creds, endpoint=self.client.endpoint,
+            bucket=self.container_name, object_name='?policy')
+        return self._do_request('GET policy', query.submit)
+
+    def delete_policy(self):
+        query = self.client.query_factory(
+            action='DELETE', creds=self.client.creds, endpoint=self.client.endpoint,
+            bucket=self.container_name, object_name='?policy')
+        return self._do_request('DELETE policy', query.submit)
diff --git a/src/allmydata/storage/backends/disk/disk_backend.py b/src/allmydata/storage/backends/disk/disk_backend.py
new file mode 100644 (file)
index 0000000..2d24f69
--- /dev/null
@@ -0,0 +1,184 @@
+
+import struct, os.path
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IStorageBackend, IShareSet
+from allmydata.util import fileutil, log
+from allmydata.storage.common import si_b2a, si_a2b, NUM_RE, \
+     UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
+from allmydata.storage.bucket import BucketWriter
+from allmydata.storage.backends.base import Backend, ShareSet
+from allmydata.storage.backends.disk.immutable import load_immutable_disk_share, create_immutable_disk_share
+from allmydata.storage.backends.disk.mutable import load_mutable_disk_share, create_mutable_disk_share
+from allmydata.mutable.layout import MUTABLE_MAGIC
+
+
+# storage/
+# storage/shares/incoming
+#   incoming/ holds temp dirs named $PREFIX/$STORAGEINDEX/$SHNUM which will
+#   be moved to storage/shares/$PREFIX/$STORAGEINDEX/$SHNUM upon success
+# storage/shares/$PREFIX/$STORAGEINDEX
+# storage/shares/$PREFIX/$STORAGEINDEX/$SHNUM
+
+# where "$PREFIX" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
+# base-32 chars).
+
+
+def si_si2dir(startdir, storage_index):
+    sia = si_b2a(storage_index)
+    return os.path.join(startdir, sia[:2], sia)
+
+def get_disk_share(home, storage_index=None, shnum=None):
+    f = open(home, 'rb')
+    try:
+        prefix = f.read(len(MUTABLE_MAGIC))
+    finally:
+        f.close()
+
+    if prefix == MUTABLE_MAGIC:
+        return load_mutable_disk_share(home, storage_index, shnum)
+    else:
+        # assume it's immutable
+        return load_immutable_disk_share(home, storage_index, shnum)
+
+
+def configure_disk_backend(storedir, config):
+    readonly = config.get_config("storage", "readonly", False, boolean=True)
+    reserved_space = config.get_config_size("storage", "reserved_space", "0")
+
+    return DiskBackend(storedir, readonly, reserved_space)
+
+
+class DiskBackend(Backend):
+    implements(IStorageBackend)
+
+    def __init__(self, storedir, readonly=False, reserved_space=0):
+        Backend.__init__(self)
+        self._storedir = storedir
+        self._readonly = readonly
+        self._reserved_space = int(reserved_space)
+        self._sharedir = os.path.join(self._storedir, 'shares')
+        fileutil.make_dirs(self._sharedir)
+        self._incomingdir = os.path.join(self._sharedir, 'incoming')
+        self._clean_incomplete()
+        if self._reserved_space and (self.get_available_space() is None):
+            log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
+                    umid="0wZ27w", level=log.UNUSUAL)
+
+    def _clean_incomplete(self):
+        fileutil.rm_dir(self._incomingdir)
+        fileutil.make_dirs(self._incomingdir)
+
+    def get_sharesets_for_prefix(self, prefix):
+        prefixdir = os.path.join(self._sharedir, prefix)
+        sharesets = [self.get_shareset(si_a2b(si_s))
+                     for si_s in sorted(fileutil.listdir(prefixdir))]
+        return defer.succeed(sharesets)
+
+    def get_shareset(self, storage_index):
+        sharehomedir = si_si2dir(self._sharedir, storage_index)
+        incominghomedir = si_si2dir(self._incomingdir, storage_index)
+        return DiskShareSet(storage_index, sharehomedir, incominghomedir)
+
+    def fill_in_space_stats(self, stats):
+        stats['storage_server.reserved_space'] = self._reserved_space
+        try:
+            disk = fileutil.get_disk_stats(self._sharedir, self._reserved_space)
+            writeable = disk['avail'] > 0
+
+            # spacetime predictors should use disk_avail / (d(disk_used)/dt)
+            stats['storage_server.disk_total'] = disk['total']
+            stats['storage_server.disk_used'] = disk['used']
+            stats['storage_server.disk_free_for_root'] = disk['free_for_root']
+            stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
+            stats['storage_server.disk_avail'] = disk['avail']
+        except AttributeError:
+            writeable = True
+        except EnvironmentError:
+            log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
+            writeable = False
+
+        if self._readonly:
+            stats['storage_server.disk_avail'] = 0
+            writeable = False
+
+        stats['storage_server.accepting_immutable_shares'] = int(writeable)
+
+    def get_available_space(self):
+        if self._readonly:
+            return 0
+        try:
+            return fileutil.get_available_space(self._sharedir, self._reserved_space)
+        except EnvironmentError:
+            return 0
+
+    def must_use_tubid_as_permutation_seed(self):
+        # A disk backend with existing shares must assume that it was around before #466,
+        # so must use its TubID as a permutation-seed.
+        return bool(set(fileutil.listdir(self._sharedir)) - set(["incoming"]))
+
+
+class DiskShareSet(ShareSet):
+    implements(IShareSet)
+
+    def __init__(self, storage_index, sharehomedir, incominghomedir=None):
+        ShareSet.__init__(self, storage_index)
+        self._sharehomedir = sharehomedir
+        self._incominghomedir = incominghomedir
+
+    def get_overhead(self):
+        return (fileutil.get_used_space(self._sharehomedir) +
+                fileutil.get_used_space(self._incominghomedir))
+
+    def get_shares(self):
+        si = self.get_storage_index()
+        shares = {}
+        corrupted = set()
+        for shnumstr in fileutil.listdir(self._sharehomedir, filter=NUM_RE):
+            shnum = int(shnumstr)
+            sharefile = os.path.join(self._sharehomedir, shnumstr)
+            try:
+                shares[shnum] = get_disk_share(sharefile, si, shnum)
+            except (UnknownMutableContainerVersionError,
+                    UnknownImmutableContainerVersionError,
+                    struct.error):
+                corrupted.add(shnum)
+
+        valid = [shares[shnum] for shnum in sorted(shares.keys())]
+        return defer.succeed( (valid, corrupted) )
+
+    def get_share(self, shnum):
+        return get_disk_share(os.path.join(self._sharehomedir, str(shnum)),
+                              self.get_storage_index(), shnum)
+
+    def delete_share(self, shnum):
+        fileutil.remove(os.path.join(self._sharehomedir, str(shnum)))
+        return defer.succeed(None)
+
+    def has_incoming(self, shnum):
+        if self._incominghomedir is None:
+            return False
+        return os.path.exists(os.path.join(self._incominghomedir, str(shnum)))
+
+    def make_bucket_writer(self, account, shnum, allocated_data_length, canary):
+        finalhome = os.path.join(self._sharehomedir, str(shnum))
+        incominghome = os.path.join(self._incominghomedir, str(shnum))
+        immsh = create_immutable_disk_share(incominghome, finalhome, allocated_data_length,
+                                            self.get_storage_index(), shnum)
+        bw = BucketWriter(account, immsh, canary)
+        return bw
+
+    def _create_mutable_share(self, account, shnum, write_enabler):
+        fileutil.make_dirs(self._sharehomedir)
+        sharehome = os.path.join(self._sharehomedir, str(shnum))
+        serverid = account.server.get_serverid()
+        return create_mutable_disk_share(sharehome, serverid, write_enabler,
+                                         self.get_storage_index(), shnum, parent=account.server)
+
+    def _clean_up_after_unlink(self):
+        fileutil.rmdir_if_empty(self._sharehomedir)
+
+    def _get_sharedir(self):
+        return self._sharehomedir
diff --git a/src/allmydata/storage/backends/null/null_backend.py b/src/allmydata/storage/backends/null/null_backend.py
new file mode 100644 (file)
index 0000000..dbd5c9e
--- /dev/null
@@ -0,0 +1,193 @@
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IStorageBackend, IShareSet, IShareBase, \
+    IShareForReading, IShareForWriting, IMutableShare
+
+from allmydata.util.assertutil import precondition
+from allmydata.storage.backends.base import Backend, ShareSet, empty_check_testv
+from allmydata.storage.bucket import BucketWriter
+from allmydata.storage.common import si_b2a
+
+
+def configure_null_backend(storedir, config):
+    return NullBackend()
+
+
+class NullBackend(Backend):
+    implements(IStorageBackend)
+    """
+    I am a test backend that records (in memory) which shares exist, but not their contents, leases,
+    or write-enablers.
+    """
+
+    def __init__(self):
+        Backend.__init__(self)
+        # mapping from storage_index to NullShareSet
+        self._sharesets = {}
+
+    def get_available_space(self):
+        return None
+
+    def get_sharesets_for_prefix(self, prefix):
+        sharesets = []
+        for (si, shareset) in self._sharesets.iteritems():
+            if si_b2a(si).startswith(prefix):
+                sharesets.append(shareset)
+
+        def _by_base32si(b):
+            return b.get_storage_index_string()
+        sharesets.sort(key=_by_base32si)
+        return defer.succeed(sharesets)
+
+    def get_shareset(self, storage_index):
+        shareset = self._sharesets.get(storage_index, None)
+        if shareset is None:
+            shareset = NullShareSet(storage_index)
+            self._sharesets[storage_index] = shareset
+        return shareset
+
+    def fill_in_space_stats(self, stats):
+        pass
+
+
+class NullShareSet(ShareSet):
+    implements(IShareSet)
+
+    def __init__(self, storage_index):
+        self.storage_index = storage_index
+        self._incoming_shnums = set()
+        self._immutable_shnums = set()
+        self._mutable_shnums = set()
+
+    def close_shnum(self, shnum):
+        self._incoming_shnums.remove(shnum)
+        self._immutable_shnums.add(shnum)
+        return defer.succeed(None)
+
+    def get_overhead(self):
+        return 0
+
+    def get_shares(self):
+        shares = {}
+        for shnum in self._immutable_shnums:
+            shares[shnum] = ImmutableNullShare(self, shnum)
+        for shnum in self._mutable_shnums:
+            shares[shnum] = MutableNullShare(self, shnum)
+        # This backend never has any corrupt shares.
+        return defer.succeed( ([shares[shnum] for shnum in sorted(shares.keys())], set()) )
+
+    def get_share(self, shnum):
+        if shnum in self._immutable_shnums:
+            return defer.succeed(ImmutableNullShare(self, shnum))
+        elif shnum in self._mutable_shnums:
+            return defer.succeed(MutableNullShare(self, shnum))
+        else:
+            def _not_found(): raise IndexError("no such share %d" % (shnum,))
+            return defer.execute(_not_found)
+
+    def delete_share(self, shnum, include_incoming=False):
+        if include_incoming and (shnum in self._incoming_shnums):
+            self._incoming_shnums.remove(shnum)
+        if shnum in self._immutable_shnums:
+            self._immutable_shnums.remove(shnum)
+        if shnum in self._mutable_shnums:
+            self._mutable_shnums.remove(shnum)
+        return defer.succeed(None)
+
+    def has_incoming(self, shnum):
+        return shnum in self._incoming_shnums
+
+    def get_storage_index(self):
+        return self.storage_index
+
+    def get_storage_index_string(self):
+        return si_b2a(self.storage_index)
+
+    def make_bucket_writer(self, account, shnum, allocated_data_length, canary):
+        self._incoming_shnums.add(shnum)
+        immutableshare = ImmutableNullShare(self, shnum)
+        bw = BucketWriter(account, immutableshare, canary)
+        bw.throw_out_all_data = True
+        return bw
+
+
+class NullShareBase(object):
+    implements(IShareBase)
+
+    def __init__(self, shareset, shnum):
+        self.shareset = shareset
+        self.shnum = shnum
+
+    def get_storage_index(self):
+        return self.shareset.get_storage_index()
+
+    def get_storage_index_string(self):
+        return self.shareset.get_storage_index_string()
+
+    def get_shnum(self):
+        return self.shnum
+
+    def get_data_length(self):
+        return 0
+
+    def get_size(self):
+        return 0
+
+    def get_used_space(self):
+        return 0
+
+    def unlink(self):
+        return self.shareset.delete_share(self.shnum, include_incoming=True)
+
+    def readv(self, readv):
+        datav = []
+        for (offset, length) in readv:
+            datav.append("")
+        return defer.succeed(datav)
+
+    def get_leases(self):
+        pass
+
+    def add_lease(self, lease):
+        pass
+
+    def renew_lease(self, renew_secret, new_expire_time):
+        raise IndexError("unable to renew non-existent lease")
+
+    def add_or_renew_lease(self, lease_info):
+        pass
+
+
+class ImmutableNullShare(NullShareBase):
+    implements(IShareForReading, IShareForWriting)
+    sharetype = "immutable"
+
+    def read_share_data(self, offset, length):
+        precondition(offset >= 0)
+        return defer.succeed("")
+
+    def get_allocated_data_length(self):
+        return 0
+
+    def write_share_data(self, offset, data):
+        return defer.succeed(None)
+
+    def close(self):
+        return self.shareset.close_shnum(self.shnum)
+
+
+class MutableNullShare(NullShareBase):
+    implements(IMutableShare)
+    sharetype = "mutable"
+
+    def check_write_enabler(self, write_enabler):
+        # Null backend doesn't check write enablers.
+        return defer.succeed(None)
+
+    def check_testv(self, testv):
+        return defer.succeed(empty_check_testv(testv))
+
+    def writev(self, datav, new_length):
+        return defer.succeed(None)