--- /dev/null
+
+from twisted.application import service
+from twisted.internet import defer
+
+from allmydata.util.deferredutil import async_iterate, gatherResults
+from allmydata.storage.common import si_b2a
+from allmydata.storage.bucket import BucketReader
+from allmydata.storage.leasedb import SHARETYPE_MUTABLE
+
+
+class Backend(service.MultiService):
+ def __init__(self):
+ service.MultiService.__init__(self)
+
+ def must_use_tubid_as_permutation_seed(self):
+ # New backends cannot have been around before #466, and so have no backward
+ # compatibility requirements for permutation seeds. The disk backend overrides this.
+ return False
+
+
+class ShareSet(object):
+ """
+ This class implements shareset logic that could work for all backends, but
+ might be useful to override for efficiency.
+ """
+ # TODO: queue operations on a shareset to ensure atomicity for each fully
+ # successful operation (#1869).
+
+ def __init__(self, storage_index):
+ self.storage_index = storage_index
+
+ def get_storage_index(self):
+ return self.storage_index
+
+ def get_storage_index_string(self):
+ return si_b2a(self.storage_index)
+
+ def make_bucket_reader(self, account, share):
+ return BucketReader(account, share)
+
+ def testv_and_readv_and_writev(self, write_enabler,
+ test_and_write_vectors, read_vector,
+ expiration_time, account):
+ # The implementation here depends on the following helper methods,
+ # which must be provided by subclasses:
+ #
+ # def _clean_up_after_unlink(self):
+ # """clean up resources associated with the shareset after some
+ # shares might have been deleted"""
+ #
+ # def _create_mutable_share(self, account, shnum, write_enabler):
+ # """create a mutable share with the given shnum and write_enabler"""
+
+ sharemap = {}
+ d = self.get_shares()
+ def _got_shares( (shares, corrupted) ):
+ d2 = defer.succeed(None)
+ for share in shares:
+ assert not isinstance(share, defer.Deferred), share
+ # XXX is it correct to ignore immutable shares? Maybe get_shares should
+ # have a parameter saying what type it's expecting.
+ if share.sharetype == "mutable":
+ d2.addCallback(lambda ign, share=share: share.check_write_enabler(write_enabler))
+ sharemap[share.get_shnum()] = share
+
+ shnums = sorted(sharemap.keys())
+
+ # if d2 does not fail, write_enabler is good for all existing shares
+
+ # now evaluate test vectors
+ def _check_testv(shnum):
+ (testv, datav, new_length) = test_and_write_vectors[shnum]
+ if shnum in sharemap:
+ d3 = sharemap[shnum].check_testv(testv)
+ elif shnum in corrupted:
+ # a corrupted share does not match any test vector
+ d3 = defer.succeed(False)
+ else:
+ # compare the vectors against an empty share, in which all
+ # reads return empty strings
+ d3 = defer.succeed(empty_check_testv(testv))
+
+ def _check_result(res):
+ if not res:
+ account.server.log("testv failed: [%d] %r" % (shnum, testv))
+ return res
+ d3.addCallback(_check_result)
+ return d3
+
+ d2.addCallback(lambda ign: async_iterate(_check_testv, test_and_write_vectors))
+
+ def _gather(testv_is_good):
+ # Gather the read vectors, before we do any writes. This ignores any
+ # corrupted shares.
+ d3 = gatherResults([sharemap[shnum].readv(read_vector) for shnum in shnums])
+
+ def _do_writes(reads):
+ read_data = {}
+ for i in range(len(shnums)):
+ read_data[shnums[i]] = reads[i]
+
+ d4 = defer.succeed(None)
+ if testv_is_good:
+ if len(set(test_and_write_vectors.keys()) & corrupted) > 0:
+ # XXX think of a better exception to raise
+ raise AssertionError("You asked to write share numbers %r of storage index %r, "
+ "but one or more of those is corrupt (numbers %r)"
+ % (list(sorted(test_and_write_vectors.keys())),
+ self.get_storage_index_string(),
+ list(sorted(corrupted))) )
+
+ # now apply the write vectors
+ for shnum in test_and_write_vectors:
+ (testv, datav, new_length) = test_and_write_vectors[shnum]
+ if new_length == 0:
+ if shnum in sharemap:
+ d4.addCallback(lambda ign, shnum=shnum:
+ sharemap[shnum].unlink())
+ d4.addCallback(lambda ign, shnum=shnum:
+ account.remove_share_and_leases(self.storage_index, shnum))
+ else:
+ if shnum not in sharemap:
+ # allocate a new share
+ d4.addCallback(lambda ign, shnum=shnum:
+ self._create_mutable_share(account, shnum,
+ write_enabler))
+ def _record_share(share, shnum=shnum):
+ sharemap[shnum] = share
+ account.add_share(self.storage_index, shnum, share.get_used_space(),
+ SHARETYPE_MUTABLE)
+ d4.addCallback(_record_share)
+ d4.addCallback(lambda ign, shnum=shnum, datav=datav, new_length=new_length:
+ sharemap[shnum].writev(datav, new_length))
+ def _update_lease(ign, shnum=shnum):
+ account.add_or_renew_default_lease(self.storage_index, shnum)
+ account.mark_share_as_stable(self.storage_index, shnum,
+ sharemap[shnum].get_used_space())
+ d4.addCallback(_update_lease)
+
+ if new_length == 0:
+ d4.addCallback(lambda ign: self._clean_up_after_unlink())
+
+ d4.addCallback(lambda ign: (testv_is_good, read_data))
+ return d4
+ d3.addCallback(_do_writes)
+ return d3
+ d2.addCallback(_gather)
+ return d2
+ d.addCallback(_got_shares)
+ return d
+
+ def readv(self, wanted_shnums, read_vector):
+ """
+ Read a vector from the numbered shares in this shareset. An empty
+ shares list means to return data from all known shares.
+
+ @param wanted_shnums=ListOf(int)
+ @param read_vector=ReadVector
+ @return DictOf(int, ReadData): shnum -> results, with one key per share
+ """
+ shnums = []
+ dreads = []
+ d = self.get_shares()
+ def _got_shares( (shares, corrupted) ):
+ # We ignore corrupted shares.
+ for share in shares:
+ assert not isinstance(share, defer.Deferred), share
+ shnum = share.get_shnum()
+ if not wanted_shnums or shnum in wanted_shnums:
+ shnums.append(share.get_shnum())
+ dreads.append(share.readv(read_vector))
+ return gatherResults(dreads)
+ d.addCallback(_got_shares)
+
+ def _got_reads(reads):
+ datavs = {}
+ for i in range(len(shnums)):
+ datavs[shnums[i]] = reads[i]
+ return datavs
+ d.addCallback(_got_reads)
+ return d
+
+
+def testv_compare(a, op, b):
+ assert op in ("lt", "le", "eq", "ne", "ge", "gt")
+ if op == "lt":
+ return a < b
+ if op == "le":
+ return a <= b
+ if op == "eq":
+ return a == b
+ if op == "ne":
+ return a != b
+ if op == "ge":
+ return a >= b
+ if op == "gt":
+ return a > b
+ # never reached
+
+
+def empty_check_testv(testv):
+ test_good = True
+ for (offset, length, operator, specimen) in testv:
+ data = ""
+ if not testv_compare(data, operator, specimen):
+ test_good = False
+ break
+ return test_good
--- /dev/null
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IStorageBackend, IShareSet
+
+from allmydata.node import InvalidValueError
+from allmydata.util.deferredutil import gatherResults
+from allmydata.util.assertutil import _assert
+from allmydata.util.dictutil import NumDict
+from allmydata.util.encodingutil import quote_output
+from allmydata.storage.common import si_a2b, NUM_RE
+from allmydata.storage.bucket import BucketWriter
+from allmydata.storage.backends.base import Backend, ShareSet
+from allmydata.storage.backends.cloud.immutable import ImmutableCloudShareForReading, ImmutableCloudShareForWriting
+from allmydata.storage.backends.cloud.mutable import MutableCloudShare
+from allmydata.storage.backends.cloud.cloud_common import get_share_key, delete_chunks
+from allmydata.mutable.layout import MUTABLE_MAGIC
+
+
+def get_cloud_share(container, storage_index, shnum, total_size):
+ key = get_share_key(storage_index, shnum)
+ d = container.get_object(key)
+ def _make_share(first_chunkdata):
+ if first_chunkdata.startswith(MUTABLE_MAGIC):
+ return MutableCloudShare(container, storage_index, shnum, total_size, first_chunkdata)
+ else:
+ # assume it's immutable
+ return ImmutableCloudShareForReading(container, storage_index, shnum, total_size, first_chunkdata)
+ d.addCallback(_make_share)
+ return d
+
+
+def configure_cloud_backend(storedir, config):
+ # REMIND: when multiple container implementations are supported, only import the container we're going to use.
+ from allmydata.storage.backends.cloud.s3.s3_container import configure_s3_container
+
+ if config.get_config("storage", "readonly", False, boolean=True):
+ raise InvalidValueError("[storage]readonly is not supported by the cloud backend; "
+ "make the container read-only instead.")
+
+ backendtype = config.get_config("storage", "backend", "disk")
+ if backendtype == "s3":
+ backendtype = "cloud.s3"
+
+ container_configurators = {
+ 'cloud.s3': configure_s3_container,
+ }
+
+ if backendtype not in container_configurators:
+ raise InvalidValueError("%s is not supported by the cloud backend; it must be one of %s"
+ % (quote_output("[storage]backend = " + backendtype), container_configurators.keys()) )
+
+ container = container_configurators[backendtype](storedir, config)
+ return CloudBackend(container)
+
+
+class CloudBackend(Backend):
+ implements(IStorageBackend)
+
+ def __init__(self, container):
+ Backend.__init__(self)
+ self._container = container
+
+ # set of (storage_index, shnum) of incoming shares
+ self._incomingset = set()
+
+ def get_sharesets_for_prefix(self, prefix):
+ d = self._container.list_objects(prefix='shares/%s/' % (prefix,))
+ def _get_sharesets(res):
+ # XXX this enumerates all shares to get the set of SIs.
+ # Is there a way to enumerate SIs more efficiently?
+ si_strings = set()
+ for item in res.contents:
+ # XXX better error handling
+ path = item.key.split('/')
+ _assert(path[0:2] == ["shares", prefix], path=path, prefix=prefix)
+ si_strings.add(path[2])
+
+ # XXX we want this to be deterministic, so we return the sharesets sorted
+ # by their si_strings, but we shouldn't need to explicitly re-sort them
+ # because list_objects returns a sorted list.
+ return [CloudShareSet(si_a2b(s), self._container, self._incomingset) for s in sorted(si_strings)]
+ d.addCallback(_get_sharesets)
+ return d
+
+ def get_shareset(self, storage_index):
+ return CloudShareSet(storage_index, self._container, self._incomingset)
+
+ def fill_in_space_stats(self, stats):
+ # TODO: query space usage of container if supported.
+ # TODO: query whether the container is read-only and set
+ # accepting_immutable_shares accordingly.
+ stats['storage_server.accepting_immutable_shares'] = 1
+
+ def get_available_space(self):
+ # TODO: query space usage of container if supported.
+ return 2**64
+
+
+class CloudShareSet(ShareSet):
+ implements(IShareSet)
+
+ def __init__(self, storage_index, container, incomingset):
+ ShareSet.__init__(self, storage_index)
+ self._container = container
+ self._incomingset = incomingset
+ self._key = get_share_key(storage_index)
+
+ def get_overhead(self):
+ return 0
+
+ def get_shares(self):
+ d = self._container.list_objects(prefix=self._key)
+ def _get_shares(res):
+ si = self.get_storage_index()
+ shnum_to_total_size = NumDict()
+ for item in res.contents:
+ key = item.key
+ _assert(key.startswith(self._key), key=key, self_key=self._key)
+ path = key.split('/')
+ if len(path) == 4:
+ (shnumstr, _, chunknumstr) = path[3].partition('.')
+ chunknumstr = chunknumstr or '0'
+ if NUM_RE.match(shnumstr) and NUM_RE.match(chunknumstr):
+ # The size is taken as the sum of sizes for all chunks, but for simplicity
+ # we don't check here that the individual chunk sizes match expectations.
+ # If they don't, that will cause an error on reading.
+ shnum_to_total_size.add_num(int(shnumstr), int(item.size))
+
+ return gatherResults([get_cloud_share(self._container, si, shnum, total_size)
+ for (shnum, total_size) in shnum_to_total_size.items_sorted_by_key()])
+ d.addCallback(_get_shares)
+ # TODO: return information about corrupt shares.
+ d.addCallback(lambda shares: (shares, set()) )
+ return d
+
+ def get_share(self, shnum):
+ key = "%s%d" % (self._key, shnum)
+ d = self._container.list_objects(prefix=key)
+ def _get_share(res):
+ total_size = 0
+ for item in res.contents:
+ total_size += item.size
+ return get_cloud_share(self._container, self.get_storage_index(), shnum, total_size)
+ d.addCallback(_get_share)
+ return d
+
+ def delete_share(self, shnum):
+ key = "%s%d" % (self._key, shnum)
+ return delete_chunks(self._container, key)
+
+ def has_incoming(self, shnum):
+ return (self.get_storage_index(), shnum) in self._incomingset
+
+ def make_bucket_writer(self, account, shnum, allocated_data_length, canary):
+ immsh = ImmutableCloudShareForWriting(self._container, self.get_storage_index(), shnum,
+ allocated_data_length, self._incomingset)
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: BucketWriter(account, immsh, canary))
+ return d
+
+ def _create_mutable_share(self, account, shnum, write_enabler):
+ serverid = account.server.get_serverid()
+ return MutableCloudShare.create_empty_share(self._container, serverid, write_enabler,
+ self.get_storage_index(), shnum, parent=account.server)
+
+ def _clean_up_after_unlink(self):
+ pass
+
+ def _get_sharedir(self):
+ # For use by tests, only with the mock cloud backend.
+ # It is OK that _get_path doesn't exist on real container objects.
+ return self._container._get_path(self._key)
--- /dev/null
+
+from collections import deque
+
+from twisted.internet import defer, reactor, task
+from twisted.python.failure import Failure
+
+from zope.interface import Interface, implements
+from allmydata.interfaces import IShareBase
+
+from allmydata.util import log
+from allmydata.util.assertutil import precondition, _assert
+from allmydata.util.deferredutil import eventually_callback, eventually_errback, eventual_chain, gatherResults
+from allmydata.storage.common import si_b2a, NUM_RE
+
+
+# The container has keys of the form shares/$PREFIX/$STORAGEINDEX/$SHNUM.$CHUNK
+
+def get_share_key(si, shnum=None):
+ sistr = si_b2a(si)
+ if shnum is None:
+ return "shares/%s/%s/" % (sistr[:2], sistr)
+ else:
+ return "shares/%s/%s/%d" % (sistr[:2], sistr, shnum)
+
+def get_chunk_key(share_key, chunknum):
+ precondition(chunknum >= 0, chunknum=chunknum)
+ if chunknum == 0:
+ return share_key
+ else:
+ return "%s.%d" % (share_key, chunknum)
+
+
+PREFERRED_CHUNK_SIZE = 512*1024
+PIPELINE_DEPTH = 4
+
+ZERO_CHUNKDATA = "\x00"*PREFERRED_CHUNK_SIZE
+
+def get_zero_chunkdata(size):
+ if size <= PREFERRED_CHUNK_SIZE:
+ return ZERO_CHUNKDATA[: size]
+ else:
+ return "\x00"*size
+
+
+class IContainer(Interface):
+ """
+ I represent a cloud container.
+ """
+ def create():
+ """
+ Create this container.
+ """
+
+ def delete():
+ """
+ Delete this container.
+ The cloud service may require the container to be empty before it can be deleted.
+ """
+
+ def list_objects(prefix=''):
+ """
+ Get a ContainerListing that lists objects in this container.
+
+ prefix: (str) limit the returned keys to those starting with prefix.
+ """
+
+ def put_object(object_name, data, content_type=None, metadata={}):
+ """
+ Put an object in this bucket.
+ Any existing object of the same name will be replaced.
+ """
+
+ def get_object(object_name):
+ """
+ Get an object from this container.
+ """
+
+ def head_object(object_name):
+ """
+ Retrieve object metadata only.
+ """
+
+ def delete_object(object_name):
+ """
+ Delete an object from this container.
+ Once deleted, there is no method to restore or undelete an object.
+ """
+
+
+def delete_chunks(container, share_key, from_chunknum=0):
+ d = container.list_objects(prefix=share_key)
+ def _delete(res):
+ def _suppress_404(f):
+ e = f.trap(container.ServiceError)
+ if e.get_error_code() != 404:
+ return f
+
+ d2 = defer.succeed(None)
+ for item in res.contents:
+ key = item.key
+ _assert(key.startswith(share_key), key=key, share_key=share_key)
+ path = key.split('/')
+ if len(path) == 4:
+ (_, _, chunknumstr) = path[3].partition('.')
+ chunknumstr = chunknumstr or "0"
+ if NUM_RE.match(chunknumstr) and int(chunknumstr) >= from_chunknum:
+ d2.addCallback(lambda ign, key=key: container.delete_object(key))
+ d2.addErrback(_suppress_404)
+ return d2
+ d.addCallback(_delete)
+ return d
+
+
+class CloudShareBase(object):
+ implements(IShareBase)
+ """
+ Attributes:
+ _container: (IContainer) the cloud container that stores this share
+ _storage_index: (str) binary storage index
+ _shnum: (integer) share number
+ _key: (str) the key prefix under which this share will be stored (no .chunknum suffix)
+ _data_length: (integer) length of data excluding headers and leases
+ _total_size: (integer) total size of the sharefile
+
+ Methods:
+ _discard(self): object will no longer be used; discard references to potentially large data
+ """
+ def __init__(self, container, storage_index, shnum):
+ precondition(IContainer.providedBy(container), container=container)
+ precondition(isinstance(storage_index, str), storage_index=storage_index)
+ precondition(isinstance(shnum, int), shnum=shnum)
+
+ # These are always known immediately.
+ self._container = container
+ self._storage_index = storage_index
+ self._shnum = shnum
+ self._key = get_share_key(storage_index, shnum)
+
+ # Subclasses must set _data_length and _total_size.
+
+ def __repr__(self):
+ return ("<%s at %r key %r>" % (self.__class__.__name__, self._container, self._key,))
+
+ def get_storage_index(self):
+ return self._storage_index
+
+ def get_storage_index_string(self):
+ return si_b2a(self._storage_index)
+
+ def get_shnum(self):
+ return self._shnum
+
+ def get_data_length(self):
+ return self._data_length
+
+ def get_size(self):
+ return self._total_size
+
+ def get_used_space(self):
+ # We're not charged for any per-object overheads in supported cloud services, so
+ # total object data sizes are what we're interested in for statistics and accounting.
+ return self.get_size()
+
+ def unlink(self):
+ self._discard()
+ return delete_chunks(self._container, self._key)
+
+ def _get_path(self):
+ """
+ When used with the mock cloud container, this returns the path of the file containing
+ the first chunk. For a real cloud container, it raises an error.
+ """
+ # It is OK that _get_path doesn't exist on real cloud container objects.
+ return self._container._get_path(self._key)
+
+
+class CloudShareReaderMixin:
+ """
+ Attributes:
+ _data_length: (integer) length of data excluding headers and leases
+ _chunksize: (integer) size of each chunk possibly excluding the last
+ _cache: (ChunkCache) the cache used to read chunks
+
+ DATA_OFFSET: (integer) offset to the start-of-data from start of the sharefile
+ """
+ def readv(self, readv):
+ sorted_readv = sorted(zip(readv, xrange(len(readv))))
+ datav = [None]*len(readv)
+ for (v, i) in sorted_readv:
+ (offset, length) = v
+ datav[i] = self.read_share_data(offset, length)
+ return gatherResults(datav)
+
+ def read_share_data(self, offset, length):
+ precondition(offset >= 0)
+
+ # Reads beyond the end of the data are truncated.
+ # Reads that start beyond the end of the data return an empty string.
+ seekpos = self.DATA_OFFSET + offset
+ actuallength = max(0, min(length, self._data_length - offset))
+ if actuallength == 0:
+ return defer.succeed("")
+
+ lastpos = seekpos + actuallength - 1
+ _assert(lastpos > 0, seekpos=seekpos, actuallength=actuallength, lastpos=lastpos)
+ start_chunknum = seekpos / self._chunksize
+ start_offset = seekpos % self._chunksize
+ last_chunknum = lastpos / self._chunksize
+ last_offset = lastpos % self._chunksize
+ _assert(start_chunknum <= last_chunknum, start_chunknum=start_chunknum, last_chunknum=last_chunknum)
+
+ parts = deque()
+
+ def _load_part(ign, chunknum):
+ # determine which part of this chunk we need
+ start = 0
+ end = self._chunksize
+ if chunknum == start_chunknum:
+ start = start_offset
+ if chunknum == last_chunknum:
+ end = last_offset + 1
+ #print "LOAD", get_chunk_key(self._key, chunknum), start, end
+
+ # d2 fires when we should continue loading the next chunk; chunkdata_d fires with the actual data.
+ chunkdata_d = defer.Deferred()
+ d2 = self._cache.get(chunknum, chunkdata_d)
+ if start > 0 or end < self._chunksize:
+ chunkdata_d.addCallback(lambda chunkdata: chunkdata[start : end])
+ parts.append(chunkdata_d)
+ return d2
+
+ d = defer.succeed(None)
+ for i in xrange(start_chunknum, last_chunknum + 1):
+ d.addCallback(_load_part, i)
+ d.addCallback(lambda ign: gatherResults(parts))
+ d.addCallback(lambda pieces: ''.join(pieces))
+ return d
+
+
+class CloudError(Exception):
+ pass
+
+
+BACKOFF_SECONDS_FOR_5XX = (0, 2, 10)
+
+
+class ContainerRetryMixin:
+ """
+ I provide a helper method for performing an operation on a cloud container that will retry up to
+ len(BACKOFF_SECONDS_FOR_5XX) times (not including the initial try). If the initial try fails, a
+ single incident will be triggered after the operation has succeeded or failed.
+ """
+
+ def _do_request(self, description, operation, *args, **kwargs):
+ d = defer.maybeDeferred(operation, *args, **kwargs)
+ def _retry(f):
+ d2 = self._handle_error(f, 1, None, description, operation, *args, **kwargs)
+ def _trigger_incident(res):
+ log.msg(format="error(s) on cloud container operation: %(description)s %(arguments)s %(kwargs)s",
+ arguments=args[:2], kwargs=kwargs, description=description,
+ level=log.WEIRD)
+ return res
+ d2.addBoth(_trigger_incident)
+ return d2
+ d.addErrback(_retry)
+ return d
+
+ def _handle_error(self, f, trynum, first_err_and_tb, description, operation, *args, **kwargs):
+ f.trap(self.ServiceError)
+
+ # Don't use f.getTracebackObject() since a fake traceback will not do for the 3-arg form of 'raise'.
+ # tb can be None (which is acceptable for 3-arg raise) if we don't have a traceback.
+ tb = getattr(f, 'tb', None)
+ fargs = f.value.args
+ if len(fargs) > 2 and fargs[2] and '<code>signaturedoesnotmatch</code>' in fargs[2].lower():
+ fargs = fargs[:2] + ("SignatureDoesNotMatch response redacted",) + fargs[3:]
+
+ args_without_data = args[:2]
+ msg = "try %d failed: %s %s %s" % (trynum, description, args_without_data, kwargs)
+ err = CloudError(msg, *fargs)
+
+ # This should not trigger an incident; we want to do that at the end.
+ log.msg(format="try %(trynum)d failed: %(description)s %(arguments)s %(kwargs)s %(fargs)s",
+ trynum=trynum, arguments=args_without_data, kwargs=kwargs, description=description, fargs=repr(fargs),
+ level=log.INFREQUENT)
+
+ if first_err_and_tb is None:
+ first_err_and_tb = (err, tb)
+
+ if trynum > len(BACKOFF_SECONDS_FOR_5XX):
+ # If we run out of tries, raise the error we got on the first try (which *may* have
+ # a more useful traceback).
+ (first_err, first_tb) = first_err_and_tb
+ raise first_err.__class__, first_err, first_tb
+
+ fargs = f.value.args
+ if len(fargs) > 0 and int(fargs[0]) >= 500 and int(fargs[0]) < 600:
+ # Retry on 5xx errors.
+ d = task.deferLater(reactor, BACKOFF_SECONDS_FOR_5XX[trynum-1], operation, *args, **kwargs)
+ d.addErrback(self._handle_error, trynum+1, first_err_and_tb, description, operation, *args, **kwargs)
+ return d
+
+ # If we get an error response other than a 5xx, raise that error even if it was on a retry.
+ raise err.__class__, err, tb
+
+
+def concat(seqs):
+ """
+ O(n), rather than O(n^2), concatenation of list-like things, returning a list.
+ I can't believe this isn't built in.
+ """
+ total_len = 0
+ for seq in seqs:
+ total_len += len(seq)
+ result = [None]*total_len
+ i = 0
+ for seq in seqs:
+ for x in seq:
+ result[i] = x
+ i += 1
+ _assert(i == total_len, i=i, total_len=total_len)
+ return result
+
+
+class ContainerListMixin:
+ """
+ S3 has a limitation of 1000 object entries returned on each list (GET Bucket) request.
+ I provide a helper method to repeat the call as many times as necessary to get a full
+ listing. The container is assumed to implement:
+
+ def list_some_objects(self, **kwargs):
+ # kwargs may include 'prefix' and 'marker' parameters as documented at
+ # <http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTBucketGET.html>.
+ # returns Deferred ContainerListing
+
+ Note that list_some_objects is assumed to be reliable; so, if retries are needed,
+ the container class should also inherit from ContainerRetryMixin and list_some_objects
+ should make the request via _do_request.
+
+ The 'delimiter' parameter of the GET Bucket API is not supported.
+ """
+ def list_objects(self, prefix=''):
+ kwargs = {'prefix': prefix}
+ all_contents = deque()
+ def _list_some():
+ d2 = self.list_some_objects(**kwargs)
+ def _got_listing(res):
+ all_contents.append(res.contents)
+ if res.is_truncated == "true":
+ _assert(len(res.contents) > 0)
+ marker = res.contents[-1].key
+ _assert('marker' not in kwargs or marker > kwargs['marker'],
+ "Not making progress in list_objects", kwargs=kwargs, marker=marker)
+ kwargs['marker'] = marker
+ return _list_some()
+ else:
+ _assert(res.is_truncated == "false", is_truncated=res.is_truncated)
+ return res
+ d2.addCallback(_got_listing)
+ return d2
+
+ d = _list_some()
+ d.addCallback(lambda res: res.__class__(res.name, res.prefix, res.marker, res.max_keys,
+ "false", concat(all_contents)))
+ def _log(f):
+ log.msg(f, level=log.WEIRD)
+ return f
+ d.addErrback(_log)
+ return d
+
+
+class BackpressurePipeline(object):
+ """
+ I manage a pipeline of Deferred operations that allows the data source to feel backpressure
+ when the pipeline is "full". I do not actually limit the number of operations in progress.
+ """
+ OPEN = 0
+ CLOSING = 1
+ CLOSED = 2
+
+ def __init__(self, capacity):
+ self._capacity = capacity # how full we can be before causing calls to 'add' to block
+ self._gauge = 0 # how full we are
+ self._waiting = [] # callers of add() who are blocked
+ self._unfinished = 0 # number of pending operations
+ self._result_d = defer.Deferred()
+ self._state = self.OPEN
+
+ def add(self, _size, _func, *args, **kwargs):
+ if self._state == self.CLOSED:
+ msg = "add() called on closed BackpressurePipeline"
+ log.err(msg, level=log.WEIRD)
+ def _already_closed(): raise AssertionError(msg)
+ return defer.execute(_already_closed)
+ self._gauge += _size
+ self._unfinished += 1
+ fd = defer.maybeDeferred(_func, *args, **kwargs)
+ fd.addBoth(self._call_finished, _size)
+ fd.addErrback(log.err, "BackpressurePipeline._call_finished raised an exception")
+ if self._gauge < self._capacity:
+ return defer.succeed(None)
+ d = defer.Deferred()
+ self._waiting.append(d)
+ return d
+
+ def fail(self, f):
+ if self._state != self.CLOSED:
+ self._state = self.CLOSED
+ eventually_errback(self._result_d)(f)
+
+ def flush(self):
+ if self._state == self.CLOSED:
+ return defer.succeed(self._result_d)
+
+ d = self.close()
+ d.addBoth(self.reopen)
+ return d
+
+ def close(self):
+ if self._state != self.CLOSED:
+ if self._unfinished == 0:
+ self._state = self.CLOSED
+ eventually_callback(self._result_d)(None)
+ else:
+ self._state = self.CLOSING
+ return self._result_d
+
+ def reopen(self, res=None):
+ _assert(self._state == self.CLOSED, state=self._state)
+ self._result_d = defer.Deferred()
+ self._state = self.OPEN
+ return res
+
+ def _call_finished(self, res, size):
+ self._unfinished -= 1
+ self._gauge -= size
+ if isinstance(res, Failure):
+ self.fail(res)
+
+ if self._state == self.CLOSING:
+ # repeat the unfinished == 0 check
+ self.close()
+
+ if self._state == self.CLOSED:
+ while self._waiting:
+ d = self._waiting.pop(0)
+ eventual_chain(self._result_d, d)
+ elif self._gauge < self._capacity:
+ while self._waiting:
+ d = self._waiting.pop(0)
+ eventually_callback(d)(None)
+ return None
+
+
+class ChunkCache(object):
+ """I cache chunks for a specific share object."""
+
+ def __init__(self, container, key, chunksize, nchunks=1, initial_cachemap={}):
+ self._container = container
+ self._key = key
+ self._chunksize = chunksize
+ self._nchunks = nchunks
+
+ # chunknum -> deferred data
+ self._cachemap = initial_cachemap
+ self._pipeline = BackpressurePipeline(PIPELINE_DEPTH)
+
+ def set_nchunks(self, nchunks):
+ self._nchunks = nchunks
+
+ def _load_chunk(self, chunknum, chunkdata_d):
+ d = self._container.get_object(get_chunk_key(self._key, chunknum))
+ eventual_chain(source=d, target=chunkdata_d)
+ return d
+
+ def get(self, chunknum, result_d):
+ if chunknum in self._cachemap:
+ # cache hit; never stall
+ eventual_chain(source=self._cachemap[chunknum], target=result_d)
+ return defer.succeed(None)
+
+ # Evict any chunks other than the first and last two, until there are
+ # three or fewer chunks left cached.
+ for candidate_chunknum in self._cachemap.keys():
+ if len(self._cachemap) <= 3:
+ break
+ if candidate_chunknum not in (0, self._nchunks-2, self._nchunks-1):
+ self.flush_chunk(candidate_chunknum)
+
+ # cache miss; stall when the pipeline is full
+ chunkdata_d = defer.Deferred()
+ d = self._pipeline.add(1, self._load_chunk, chunknum, chunkdata_d)
+ def _check(res):
+ _assert(res is not None)
+ return res
+ chunkdata_d.addCallback(_check)
+ self._cachemap[chunknum] = chunkdata_d
+ eventual_chain(source=chunkdata_d, target=result_d)
+ return d
+
+ def flush_chunk(self, chunknum):
+ if chunknum in self._cachemap:
+ del self._cachemap[chunknum]
+
+ def close(self):
+ self._cachemap = None
+ return self._pipeline.close()
--- /dev/null
+
+import struct
+
+from cStringIO import StringIO
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IShareForReading, IShareForWriting
+
+from allmydata.util.assertutil import precondition, _assert
+from allmydata.util.mathutil import div_ceil
+from allmydata.storage.common import UnknownImmutableContainerVersionError, DataTooLargeError
+from allmydata.storage.backends.cloud import cloud_common
+from allmydata.storage.backends.cloud.cloud_common import get_chunk_key, \
+ BackpressurePipeline, ChunkCache, CloudShareBase, CloudShareReaderMixin
+
+
+# Each share file (stored in the chunks with keys 'shares/$PREFIX/$STORAGEINDEX/$SHNUM.$CHUNK')
+# contains lease information [currently inaccessible] and share data. The share data is
+# accessed by RIBucketWriter.write and RIBucketReader.read .
+
+# The share file has the following layout:
+# 0x00: share file version number, four bytes, current version is 1
+# 0x04: always zero (was share data length prior to Tahoe-LAFS v1.3.0)
+# 0x08: number of leases, four bytes big-endian
+# 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
+# data_length + 0x0c: first lease. Each lease record is 72 bytes. (not used)
+
+
+class ImmutableCloudShareMixin:
+ sharetype = "immutable"
+ LEASE_SIZE = struct.calcsize(">L32s32sL") # for compatibility
+ HEADER = ">LLL"
+ HEADER_SIZE = struct.calcsize(HEADER)
+ DATA_OFFSET = HEADER_SIZE
+
+
+class ImmutableCloudShareForWriting(CloudShareBase, ImmutableCloudShareMixin):
+ implements(IShareForWriting)
+
+ def __init__(self, container, storage_index, shnum, allocated_data_length, incomingset):
+ """
+ I won't allow more than allocated_data_length to be written to me.
+ """
+ precondition(isinstance(allocated_data_length, (int, long)), allocated_data_length)
+ CloudShareBase.__init__(self, container, storage_index, shnum)
+
+ self._chunksize = cloud_common.PREFERRED_CHUNK_SIZE
+ self._allocated_data_length = allocated_data_length
+
+ self._buf = StringIO()
+ # The second field, which was the four-byte share data length in
+ # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0.
+ # We also write 0 for the number of leases.
+ self._buf.write(struct.pack(self.HEADER, 1, 0, 0) )
+ self._set_size(self._buf.tell())
+ self._current_chunknum = 0
+
+ self._incomingset = incomingset
+ self._incomingset.add( (storage_index, shnum) )
+
+ self._pipeline = BackpressurePipeline(cloud_common.PIPELINE_DEPTH)
+
+ def _set_size(self, size):
+ self._total_size = size
+ self._data_length = size - self.DATA_OFFSET # no leases
+
+ def get_allocated_data_length(self):
+ return self._allocated_data_length
+
+ def write_share_data(self, offset, data):
+ """Write 'data' at position 'offset' past the end of the header."""
+ seekpos = self.DATA_OFFSET + offset
+ precondition(seekpos >= self._total_size, offset=offset, seekpos=seekpos, total_size=self._total_size)
+ if offset + len(data) > self._allocated_data_length:
+ raise DataTooLargeError(self._allocated_data_length, offset, len(data))
+
+ self._set_size(self._total_size + len(data))
+ return self._store_or_buffer( (seekpos, data, 0) )
+
+ def close(self):
+ chunkdata = self._buf.getvalue()
+ self._discard()
+ d = self._pipeline_store_next_chunk(chunkdata)
+ d.addCallback(lambda ign: self._pipeline.close())
+ return d
+
+ def _store_or_buffer(self, (seekpos, b, b_offset) ):
+ """
+ Helper method that stores the next complete chunk to the container or buffers
+ an incomplete chunk. The data still to be written is b[b_offset:], but we may
+ only process part of it in this call.
+ """
+ chunknum = seekpos / self._chunksize
+ offset_in_chunk = seekpos % self._chunksize
+
+ _assert(chunknum >= self._current_chunknum, seekpos=seekpos, chunknum=chunknum,
+ current_chunknum=self._current_chunknum)
+
+ if chunknum > self._current_chunknum or offset_in_chunk + (len(b) - b_offset) >= self._chunksize:
+ if chunknum > self._current_chunknum:
+ # The write left a gap that spans a chunk boundary. Fill with zeroes to the end
+ # of the current chunk and store it.
+ # TODO: test this case
+ self._buf.seek(self._chunksize - 1)
+ self._buf.write("\x00")
+ else:
+ # Store a complete chunk.
+ writelen = self._chunksize - offset_in_chunk
+ self._buf.seek(offset_in_chunk)
+ self._buf.write(b[b_offset : b_offset + writelen])
+ seekpos += writelen
+ b_offset += writelen
+
+ chunkdata = self._buf.getvalue()
+ self._buf = StringIO()
+ _assert(len(chunkdata) == self._chunksize, len_chunkdata=len(chunkdata), chunksize=self._chunksize)
+
+ d2 = self._pipeline_store_next_chunk(chunkdata)
+ d2.addCallback(lambda ign: self._store_or_buffer( (seekpos, b, b_offset) ))
+ return d2
+ else:
+ # Buffer an incomplete chunk.
+ if b_offset > 0:
+ b = b[b_offset :]
+ self._buf.seek(offset_in_chunk)
+ self._buf.write(b)
+ return defer.succeed(None)
+
+ def _pipeline_store_next_chunk(self, chunkdata):
+ chunkkey = get_chunk_key(self._key, self._current_chunknum)
+ self._current_chunknum += 1
+ #print "STORING", chunkkey, len(chunkdata)
+
+ # We'd like to stream writes, but the supported service containers
+ # (and the IContainer interface) don't support that yet. For txaws, see
+ # https://bugs.launchpad.net/txaws/+bug/767205 and
+ # https://bugs.launchpad.net/txaws/+bug/783801
+ return self._pipeline.add(1, self._container.put_object, chunkkey, chunkdata)
+
+ def _discard(self):
+ self._buf = None
+ self._incomingset.discard( (self.get_storage_index(), self.get_shnum()) )
+
+
+class ImmutableCloudShareForReading(CloudShareBase, ImmutableCloudShareMixin, CloudShareReaderMixin):
+ implements(IShareForReading)
+
+ def __init__(self, container, storage_index, shnum, total_size, first_chunkdata):
+ CloudShareBase.__init__(self, container, storage_index, shnum)
+
+ precondition(isinstance(total_size, (int, long)), total_size=total_size)
+ precondition(isinstance(first_chunkdata, str), type(first_chunkdata))
+ precondition(len(first_chunkdata) <= total_size, len_first_chunkdata=len(first_chunkdata), total_size=total_size)
+
+ chunksize = len(first_chunkdata)
+ if chunksize < self.HEADER_SIZE:
+ msg = "%r had incomplete header (%d bytes)" % (self, chunksize)
+ raise UnknownImmutableContainerVersionError(msg)
+
+ self._total_size = total_size
+ self._chunksize = chunksize
+ nchunks = div_ceil(total_size, chunksize)
+ initial_cachemap = {0: defer.succeed(first_chunkdata)}
+ self._cache = ChunkCache(container, self._key, chunksize, nchunks, initial_cachemap)
+ #print "ImmutableCloudShareForReading", total_size, chunksize, self._key
+
+ header = first_chunkdata[:self.HEADER_SIZE]
+ (version, unused, num_leases) = struct.unpack(self.HEADER, header)
+
+ if version != 1:
+ msg = "%r had version %d but we wanted 1" % (self, version)
+ raise UnknownImmutableContainerVersionError(msg)
+
+ # We cannot write leases in share files, but allow them to be present
+ # in case a share file is copied from a disk backend, or in case we
+ # need them in future.
+ self._data_length = total_size - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE)
+
+ # TODO: raise a better exception.
+ _assert(self._data_length >= 0, data_length=self._data_length)
+
+ # Boilerplate is in CloudShareBase, read implementation is in CloudShareReaderMixin.
+ # So nothing to implement here. Yay!
+
+ def _discard(self):
+ pass
--- /dev/null
+
+import os.path
+
+from twisted.internet import defer
+from twisted.web.error import Error
+from allmydata.util.deferredutil import async_iterate
+
+from zope.interface import implements
+
+from allmydata.storage.backends.cloud.cloud_common import IContainer, \
+ ContainerRetryMixin, ContainerListMixin
+from allmydata.util.time_format import iso_utc
+from allmydata.util import fileutil
+
+
+MAX_KEYS = 1000
+
+
+def configure_mock_cloud_backend(storedir, config):
+ from allmydata.storage.backends.cloud.cloud_backend import CloudBackend
+
+ container = MockContainer(storedir)
+ return CloudBackend(container)
+
+
+class MockContainer(ContainerRetryMixin, ContainerListMixin):
+ implements(IContainer)
+ """
+ I represent a mock cloud container that stores its data in the local filesystem.
+ I also keep track of the number of loads and stores.
+ """
+
+ def __init__(self, storagedir):
+ self._storagedir = storagedir
+ self.container_name = "MockContainer"
+ self.ServiceError = MockServiceError
+ self._load_count = 0
+ self._store_count = 0
+
+ fileutil.make_dirs(os.path.join(self._storagedir, "shares"))
+
+ def __repr__(self):
+ return ("<%s at %r>" % (self.__class__.__name__, self._storagedir,))
+
+ def _create(self):
+ return defer.execute(self._not_implemented)
+
+ def _delete(self):
+ return defer.execute(self._not_implemented)
+
+ def _iterate_dirs(self):
+ shares_dir = os.path.join(self._storagedir, "shares")
+ for prefixstr in sorted(fileutil.listdir(shares_dir)):
+ prefixkey = "shares/%s" % (prefixstr,)
+ prefixdir = os.path.join(shares_dir, prefixstr)
+ for sistr in sorted(fileutil.listdir(prefixdir)):
+ sikey = "%s/%s" % (prefixkey, sistr)
+ sidir = os.path.join(prefixdir, sistr)
+ for shnumstr in sorted(fileutil.listdir(sidir)):
+ sharefile = os.path.join(sidir, shnumstr)
+ yield (sharefile, "%s/%s" % (sikey, shnumstr))
+
+ def _list_some_objects(self, ign, prefix='', marker=None, max_keys=None):
+ if max_keys is None:
+ max_keys = MAX_KEYS
+ contents = []
+ def _next_share(res):
+ if res is None:
+ return
+ (sharefile, sharekey) = res
+ # note that all strings are > None
+ if sharekey.startswith(prefix) and sharekey > marker:
+ stat_result = os.stat(sharefile)
+ mtime_utc = iso_utc(stat_result.st_mtime, sep=' ')+'+00:00'
+ item = ContainerItem(key=sharekey, modification_date=mtime_utc, etag="",
+ size=stat_result.st_size, storage_class="STANDARD")
+ contents.append(item)
+ return len(contents) < max_keys
+
+ d = async_iterate(_next_share, self._iterate_dirs())
+ def _done(completed):
+ contents.sort(key=lambda item: item.key)
+ return ContainerListing(self.container_name, '', '', max_keys,
+ is_truncated=str(not completed).lower(), contents=contents)
+ d.addCallback(_done)
+ return d
+
+ def _get_path(self, object_name, must_exist=False):
+ # This method is also called by tests.
+ sharefile = os.path.join(self._storagedir, object_name)
+ if must_exist and not os.path.exists(sharefile):
+ raise MockServiceError("", 404, "not found")
+ return sharefile
+
+ def _put_object(self, ign, object_name, data, content_type, metadata):
+ assert content_type is None, content_type
+ assert metadata == {}, metadata
+ sharefile = self._get_path(object_name)
+ fileutil.make_dirs(os.path.dirname(sharefile))
+ fileutil.write(sharefile, data)
+ self._store_count += 1
+ return defer.succeed(None)
+
+ def _get_object(self, ign, object_name):
+ self._load_count += 1
+ data = fileutil.read(self._get_path(object_name, must_exist=True))
+ return defer.succeed(data)
+
+ def _head_object(self, ign, object_name):
+ return defer.execute(self._not_implemented)
+
+ def _delete_object(self, ign, object_name):
+ fileutil.remove(self._get_path(object_name, must_exist=True))
+ return defer.succeed(None)
+
+ def _not_implemented(self):
+ raise NotImplementedError
+
+ # methods that use error handling from ContainerRetryMixin
+
+ def create(self):
+ return self._do_request('create bucket', self._create, self.container_name)
+
+ def delete(self):
+ return self._do_request('delete bucket', self._delete, self.container_name)
+
+ def list_some_objects(self, **kwargs):
+ return self._do_request('list objects', self._list_some_objects, self.container_name, **kwargs)
+
+ def put_object(self, object_name, data, content_type=None, metadata={}):
+ return self._do_request('PUT object', self._put_object, self.container_name, object_name,
+ data, content_type, metadata)
+
+ def get_object(self, object_name):
+ return self._do_request('GET object', self._get_object, self.container_name, object_name)
+
+ def head_object(self, object_name):
+ return self._do_request('HEAD object', self._head_object, self.container_name, object_name)
+
+ def delete_object(self, object_name):
+ return self._do_request('DELETE object', self._delete_object, self.container_name, object_name)
+
+ def reset_load_store_counts(self):
+ self._load_count = 0
+ self._store_count = 0
+
+ def get_load_count(self):
+ return self._load_count
+
+ def get_store_count(self):
+ return self._store_count
+
+
+class MockServiceError(Error):
+ """
+ A error class similar to txaws' S3Error.
+ """
+ def __init__(self, xml_bytes, status, message=None, response=None, request_id="", host_id=""):
+ Error.__init__(self, status, message, response)
+ self.original = xml_bytes
+ self.status = str(status)
+ self.message = str(message)
+ self.request_id = request_id
+ self.host_id = host_id
+
+ def get_error_code(self):
+ return self.status
+
+ def get_error_message(self):
+ return self.message
+
+ def parse(self, xml_bytes=""):
+ raise NotImplementedError
+
+ def has_error(self, errorString):
+ raise NotImplementedError
+
+ def get_error_codes(self):
+ raise NotImplementedError
+
+ def get_error_messages(self):
+ raise NotImplementedError
+
+
+# Originally from txaws.s3.model (under different class names), which was under the MIT / Expat licence.
+
+class ContainerItem(object):
+ """
+ An item in a listing of cloud objects.
+ """
+ def __init__(self, key, modification_date, etag, size, storage_class,
+ owner=None):
+ self.key = key
+ self.modification_date = modification_date
+ self.etag = etag
+ self.size = size
+ self.storage_class = storage_class
+ self.owner = owner
+
+ def __repr__(self):
+ return "<ContainerItem %r>" % ({
+ "key": self.key,
+ "modification_date": self.modification_date,
+ "etag": self.etag,
+ "size": self.size,
+ "storage_class": self.storage_class,
+ "owner": self.owner,
+ },)
+
+
+class ContainerListing(object):
+ def __init__(self, name, prefix, marker, max_keys, is_truncated,
+ contents=None, common_prefixes=None):
+ self.name = name
+ self.prefix = prefix
+ self.marker = marker
+ self.max_keys = max_keys
+ self.is_truncated = is_truncated
+ self.contents = contents
+ self.common_prefixes = common_prefixes
+
+ def __repr__(self):
+ return "<ContainerListing %r>" % ({
+ "name": self.name,
+ "prefix": self.prefix,
+ "marker": self.marker,
+ "max_keys": self.max_keys,
+ "is_truncated": self.is_truncated,
+ "contents": self.contents,
+ "common_prefixes": self.common_prefixes,
+ })
--- /dev/null
+
+import struct
+from collections import deque
+
+from twisted.internet import defer
+from allmydata.util.deferredutil import gatherResults, async_iterate
+
+from zope.interface import implements
+
+from allmydata.interfaces import IMutableShare, BadWriteEnablerError
+from allmydata.util import idlib, log
+from allmydata.util.assertutil import precondition, _assert
+from allmydata.util.mathutil import div_ceil
+from allmydata.util.hashutil import timing_safe_compare
+from allmydata.storage.common import UnknownMutableContainerVersionError, DataTooLargeError
+from allmydata.storage.backends.base import testv_compare
+from allmydata.mutable.layout import MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
+from allmydata.storage.backends.cloud import cloud_common
+from allmydata.storage.backends.cloud.cloud_common import get_chunk_key, get_zero_chunkdata, \
+ delete_chunks, BackpressurePipeline, ChunkCache, CloudShareBase, CloudShareReaderMixin
+
+
+# Mutable shares have a different layout to immutable shares. See docs/mutable.rst
+# for more details.
+
+# # offset size name
+# 1 0 32 magic verstr "tahoe mutable container v1" plus binary
+# 2 32 20 write enabler's nodeid
+# 3 52 32 write enabler
+# 4 84 8 data size (actual share data present) (a)
+# 5 92 8 offset of (8) count of extra leases (after data)
+# 6 100 368 four leases, 92 bytes each, unused
+# 7 468 (a) data
+# 8 ?? 4 count of extra leases
+# 9 ?? n*92 extra leases
+
+
+# The struct module doc says that L's are 4 bytes in size, and that Q's are
+# 8 bytes in size. Since compatibility depends upon this, double-check it.
+assert struct.calcsize(">L") == 4, struct.calcsize(">L")
+assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
+
+
+class Namespace(object):
+ pass
+
+
+class MutableCloudShare(CloudShareBase, CloudShareReaderMixin):
+ implements(IMutableShare)
+
+ sharetype = "mutable"
+ DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
+ EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
+ HEADER = ">32s20s32sQQ"
+ HEADER_SIZE = struct.calcsize(HEADER) # doesn't include leases
+ LEASE_SIZE = struct.calcsize(">LL32s32s20s")
+ assert LEASE_SIZE == 92, LEASE_SIZE
+ DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
+ assert DATA_OFFSET == 468, DATA_OFFSET
+ NUM_EXTRA_LEASES_SIZE = struct.calcsize(">L")
+
+ MAGIC = MUTABLE_MAGIC
+ assert len(MAGIC) == 32
+ MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
+
+ def __init__(self, container, storage_index, shnum, total_size, first_chunkdata, parent=None):
+ CloudShareBase.__init__(self, container, storage_index, shnum)
+
+ precondition(isinstance(total_size, (int, long)), total_size=total_size)
+ precondition(isinstance(first_chunkdata, str), type(first_chunkdata))
+ precondition(len(first_chunkdata) <= total_size, "total size is smaller than first chunk",
+ len_first_chunkdata=len(first_chunkdata), total_size=total_size)
+
+ if len(first_chunkdata) < self.HEADER_SIZE:
+ msg = "%r had incomplete header (%d bytes)" % (self, len(first_chunkdata))
+ raise UnknownMutableContainerVersionError(msg)
+
+ header = first_chunkdata[:self.HEADER_SIZE]
+ (magic, write_enabler_nodeid, real_write_enabler,
+ data_length, extra_lease_offset) = struct.unpack(self.HEADER, header)
+
+ if magic != self.MAGIC:
+ msg = "%r had magic %r but we wanted %r" % (self, magic, self.MAGIC)
+ raise UnknownMutableContainerVersionError(msg)
+
+ self._write_enabler_nodeid = write_enabler_nodeid
+ self._real_write_enabler = real_write_enabler
+
+ # We want to support changing PREFERRED_CHUNK_SIZE without breaking compatibility,
+ # but without "rechunking" any existing shares. Also, existing shares created by
+ # the pre-chunking code should be handled correctly.
+
+ # If there is more than one chunk, the chunksize must be equal to the size of the
+ # first chunk, to avoid rechunking.
+ self._chunksize = len(first_chunkdata)
+ if self._chunksize == total_size:
+ # There is only one chunk, so we are at liberty to make the chunksize larger
+ # than that chunk, but not smaller.
+ self._chunksize = max(self._chunksize, cloud_common.PREFERRED_CHUNK_SIZE)
+
+ self._zero_chunkdata = get_zero_chunkdata(self._chunksize)
+
+ initial_cachemap = {0: defer.succeed(first_chunkdata)}
+ self._cache = ChunkCache(container, self._key, self._chunksize, initial_cachemap=initial_cachemap)
+ #print "CONSTRUCT %s with %r" % (object.__repr__(self), self._cache)
+ self._data_length = data_length
+ self._set_total_size(self.DATA_OFFSET + data_length + self.NUM_EXTRA_LEASES_SIZE)
+
+ # The initial total size may not be less than the size of header + data + extra lease count.
+ # TODO: raise a better exception.
+ _assert(total_size >= self._total_size, share=repr(self),
+ total_size=total_size, self_total_size=self._total_size, data_length=data_length)
+ self._is_oversize = total_size > self._total_size
+
+ self._pipeline = BackpressurePipeline(cloud_common.PIPELINE_DEPTH)
+
+ self.parent = parent # for logging
+
+ def _set_total_size(self, total_size):
+ self._total_size = total_size
+ self._nchunks = div_ceil(self._total_size, self._chunksize)
+ self._cache.set_nchunks(self._nchunks)
+
+ def log(self, *args, **kwargs):
+ if self.parent:
+ return self.parent.log(*args, **kwargs)
+
+ @classmethod
+ def create_empty_share(cls, container, serverid, write_enabler, storage_index=None, shnum=None, parent=None):
+ # Unlike the disk backend, we don't check that the cloud object does not exist;
+ # we assume that it does not because create was used, and no-one else should be
+ # writing to the bucket.
+
+ # There are no extra leases, but for compatibility, the offset they would have
+ # still needs to be stored in the header.
+ data_length = 0
+ extra_lease_offset = cls.DATA_OFFSET + data_length
+ header = struct.pack(cls.HEADER, cls.MAGIC, serverid, write_enabler,
+ data_length, extra_lease_offset)
+ leases = "\x00"*(cls.LEASE_SIZE * 4)
+ extra_lease_count = struct.pack(">L", 0)
+ first_chunkdata = header + leases + extra_lease_count
+
+ share = cls(container, storage_index, shnum, len(first_chunkdata), first_chunkdata, parent=parent)
+
+ d = share._raw_writev(deque([(0, first_chunkdata)]), 0, 0)
+ d.addCallback(lambda ign: share)
+ return d
+
+ def _discard(self):
+ # TODO: discard read cache
+ pass
+
+ def check_write_enabler(self, write_enabler):
+ # avoid a timing attack
+ if not timing_safe_compare(write_enabler, self._real_write_enabler):
+ # accomodate share migration by reporting the nodeid used for the
+ # old write enabler.
+ self.log(format="bad write enabler on SI %(si)s,"
+ " recorded by nodeid %(nodeid)s",
+ facility="tahoe.storage",
+ level=log.WEIRD, umid="DF2fCR",
+ si=self.get_storage_index_string(),
+ nodeid=idlib.nodeid_b2a(self._write_enabler_nodeid))
+ msg = "The write enabler was recorded by nodeid '%s'." % \
+ (idlib.nodeid_b2a(self._write_enabler_nodeid),)
+ raise BadWriteEnablerError(msg)
+ return defer.succeed(None)
+
+ def check_testv(self, testv):
+ def _test( (offset, length, operator, specimen) ):
+ d = self.read_share_data(offset, length)
+ d.addCallback(lambda data: testv_compare(data, operator, specimen))
+ return d
+ return async_iterate(_test, sorted(testv))
+
+ def writev(self, datav, new_length):
+ precondition(new_length is None or new_length >= 0, new_length=new_length)
+
+ raw_datav, preserved_size, new_data_length = self._prepare_writev(datav, new_length)
+ return self._raw_writev(raw_datav, preserved_size, new_data_length)
+
+ def _prepare_writev(self, datav, new_length):
+ # Translate the client's write vector and 'new_length' into a "raw" write vector
+ # and new total size. This has no side effects to make it easier to test.
+
+ preserved_size = self.DATA_OFFSET + self._data_length
+
+ # chunk containing the byte after the current end-of-data
+ endofdata_chunknum = preserved_size / self._chunksize
+
+ # Whether we need to add a dummy write to zero-extend the end-of-data chunk.
+ ns = Namespace()
+ ns.need_zeroextend_write = preserved_size % self._chunksize != 0
+
+ raw_datav = deque()
+ def _add_write(seekpos, data):
+ #print "seekpos =", seekpos
+ raw_datav.append( (seekpos, data) )
+
+ lastpos = seekpos + len(data) - 1
+ start_chunknum = seekpos / self._chunksize
+ last_chunknum = lastpos / self._chunksize
+ if start_chunknum <= endofdata_chunknum and endofdata_chunknum <= last_chunknum:
+ # If any of the client's writes overlaps the end-of-data chunk, we should not
+ # add the zero-extending dummy write.
+ ns.need_zeroextend_write = False
+
+ #print "need_zeroextend_write =", ns.need_zeroextend_write
+ new_data_length = self._data_length
+
+ # Validate the write vector and translate its offsets into seek positions from
+ # the start of the share.
+ for (offset, data) in datav:
+ length = len(data)
+ precondition(offset >= 0, offset=offset)
+ if offset + length > self.MAX_SIZE:
+ raise DataTooLargeError()
+
+ if new_length is not None and new_length < offset + length:
+ length = max(0, new_length - offset)
+ data = data[: length]
+
+ new_data_length = max(new_data_length, offset + length)
+ if length > 0:
+ _add_write(self.DATA_OFFSET + offset, data)
+
+ # new_length can only be used to truncate, not extend.
+ if new_length is not None:
+ new_data_length = min(new_length, new_data_length)
+
+ # If the data length has changed, include additional raw writes to the data length
+ # field in the header, and to the extra lease count field after the data.
+ #
+ # Also do this if there were extra leases (e.g. if this was a share copied from a
+ # disk backend), so that they will be deleted. If the size hasn't changed and there
+ # are no extra leases, we don't bother to ensure that the extra lease count field is
+ # zero; it is ignored anyway.
+ if new_data_length != self._data_length or self._is_oversize:
+ extra_lease_offset = self.DATA_OFFSET + new_data_length
+
+ # Don't preserve old data past the new end-of-data.
+ preserved_size = min(preserved_size, extra_lease_offset)
+
+ # These are disjoint with any ranges already in raw_datav.
+ _add_write(self.DATA_LENGTH_OFFSET, struct.pack(">Q", new_data_length))
+ _add_write(extra_lease_offset, struct.pack(">L", 0))
+
+ #print "need_zeroextend_write =", ns.need_zeroextend_write
+ # If the data length is being increased and there are no other writes to the
+ # current end-of-data chunk (including the two we just added), add a dummy write
+ # of one zero byte at the end of that chunk. This will cause that chunk to be
+ # zero-extended to the full chunk size, which would not otherwise happen.
+ if new_data_length > self._data_length and ns.need_zeroextend_write:
+ _add_write((endofdata_chunknum + 1)*self._chunksize - 1, "\x00")
+
+ # Sorting the writes simplifies things (and we need all the simplification we can get :-)
+ raw_datav = deque(sorted(raw_datav, key=lambda (offset, data): offset))
+
+ # Complain if write vector elements overlap, that's too hard in general.
+ (last_seekpos, last_data) = (0, "")
+ have_duplicates = False
+ for (i, (seekpos, data)) in enumerate(raw_datav):
+ # The MDMF publisher in 1.9.0 and 1.9.1 produces duplicated writes to the MDMF header.
+ # If this is an exactly duplicated write, skip it.
+ if seekpos == last_seekpos and data == last_data:
+ raw_datav[i] = None
+ have_duplicates = True
+ else:
+ last_endpos = last_seekpos + len(last_data)
+ _assert(seekpos >= last_endpos, "overlapping write vector elements",
+ seekpos=seekpos, last_seekpos=last_seekpos, last_endpos=last_endpos)
+ (last_seekpos, last_data) = (seekpos, data)
+
+ if have_duplicates:
+ raw_datav.remove(None)
+
+ # Return a vector of writes to ranges in the share, the size of previous contents to
+ # be preserved, and the final data length.
+ return (raw_datav, preserved_size, new_data_length)
+
+ def _raw_writev(self, raw_datav, preserved_size, new_data_length):
+ #print "%r._raw_writev(%r, %r, %r)" % (self, raw_datav, preserved_size, new_data_length)
+
+ old_nchunks = self._nchunks
+
+ # The _total_size and _nchunks attributes are updated as each write is applied.
+ self._set_total_size(preserved_size)
+
+ final_size = self.DATA_OFFSET + new_data_length + self.NUM_EXTRA_LEASES_SIZE
+
+ d = self._raw_write_share_data(None, raw_datav, final_size)
+
+ def _resize(ign):
+ self._data_length = new_data_length
+ self._set_total_size(final_size)
+
+ if self._nchunks < old_nchunks or self._is_oversize:
+ self._is_oversize = False
+ #print "DELETING chunks from", self._nchunks
+ return delete_chunks(self._container, self._key, from_chunknum=self._nchunks)
+ d.addCallback(_resize)
+
+ d.addCallback(lambda ign: self._pipeline.flush())
+ return d
+
+ def _raw_write_share_data(self, ign, raw_datav, final_size):
+ """
+ raw_datav: (deque of (integer, str)) the remaining raw write vector
+ final_size: (integer) the size the file will be after all writes in the writev
+ """
+ #print "%r._raw_write_share_data(%r, %r)" % (self, (seekpos, data), final_size)
+
+ precondition(final_size >= 0, final_size=final_size)
+
+ d = defer.succeed(None)
+ if not raw_datav:
+ return d
+
+ (seekpos, data) = raw_datav.popleft()
+ _assert(seekpos >= 0 and len(data) > 0, seekpos=seekpos, len_data=len(data),
+ len_raw_datav=len(raw_datav), final_size=final_size)
+
+ # We *may* need to read the start chunk and/or last chunk before rewriting them.
+ # (If they are the same chunk, that's fine, the cache will ensure we don't
+ # read the cloud object twice.)
+ lastpos = seekpos + len(data) - 1
+ _assert(lastpos > 0, seekpos=seekpos, len_data=len(data), lastpos=lastpos)
+ start_chunknum = seekpos / self._chunksize
+ start_chunkpos = start_chunknum*self._chunksize
+ start_offset = seekpos % self._chunksize
+ last_chunknum = lastpos / self._chunksize
+ last_chunkpos = last_chunknum*self._chunksize
+ last_offset = lastpos % self._chunksize
+ _assert(start_chunknum <= last_chunknum, start_chunknum=start_chunknum, last_chunknum=last_chunknum)
+
+ #print "lastpos =", lastpos
+ #print "len(data) =", len(data)
+ #print "start_chunknum =", start_chunknum
+ #print "start_offset =", start_offset
+ #print "last_chunknum =", last_chunknum
+ #print "last_offset =", last_offset
+ #print "_total_size =", self._total_size
+ #print "_chunksize =", self._chunksize
+ #print "_nchunks =", self._nchunks
+
+ start_chunkdata_d = defer.Deferred()
+ last_chunkdata_d = defer.Deferred()
+
+ # Is the first byte of the start chunk preserved?
+ if start_chunknum*self._chunksize < self._total_size and start_offset > 0:
+ # Yes, so we need to read it first.
+ d.addCallback(lambda ign: self._cache.get(start_chunknum, start_chunkdata_d))
+ else:
+ start_chunkdata_d.callback("")
+
+ # Is any byte of the last chunk preserved?
+ if last_chunkpos < self._total_size and lastpos < min(self._total_size, last_chunkpos + self._chunksize) - 1:
+ # Yes, so we need to read it first.
+ d.addCallback(lambda ign: self._cache.get(last_chunknum, last_chunkdata_d))
+ else:
+ last_chunkdata_d.callback("")
+
+ d.addCallback(lambda ign: gatherResults( (start_chunkdata_d, last_chunkdata_d) ))
+ def _got( (start_chunkdata, last_chunkdata) ):
+ #print "start_chunkdata =", len(start_chunkdata), repr(start_chunkdata)
+ #print "last_chunkdata =", len(last_chunkdata), repr(last_chunkdata)
+ d2 = defer.succeed(None)
+
+ # Zero any chunks from self._nchunks (i.e. after the last currently valid chunk)
+ # to before the start chunk of the write.
+ for zero_chunknum in xrange(self._nchunks, start_chunknum):
+ d2.addCallback(self._pipeline_store_chunk, zero_chunknum, self._zero_chunkdata)
+
+ # start_chunkdata and last_chunkdata may need to be truncated and/or zero-extended.
+ start_preserved = max(0, min(len(start_chunkdata), self._total_size - start_chunkpos, start_offset))
+ last_preserved = max(0, min(len(last_chunkdata), self._total_size - last_chunkpos))
+
+ start_chunkdata = (start_chunkdata[: start_preserved] +
+ self._zero_chunkdata[: max(0, start_offset - start_preserved)] +
+ data[: self._chunksize - start_offset])
+
+ # last_slice_len = len(last_chunkdata[last_offset + 1 : last_preserved])
+ last_slice_len = max(0, last_preserved - (last_offset + 1))
+ last_chunksize = min(final_size - last_chunkpos, self._chunksize)
+ last_chunkdata = (last_chunkdata[last_offset + 1 : last_preserved] +
+ self._zero_chunkdata[: max(0, last_chunksize - (last_offset + 1) - last_slice_len)])
+
+ # This loop eliminates redundant reads and writes, by merging the contents of writes
+ # after this one into last_chunkdata as far as possible. It ensures that we never need
+ # to read a chunk twice in the same writev (which is needed for correctness; see below).
+ while raw_datav:
+ # Does the next write start in the same chunk as this write ends (last_chunknum)?
+ (next_seekpos, next_chunkdata) = raw_datav[0]
+ next_start_chunknum = next_seekpos / self._chunksize
+ next_start_offset = next_seekpos % self._chunksize
+ next_lastpos = next_seekpos + len(next_chunkdata) - 1
+
+ if next_start_chunknum != last_chunknum:
+ break
+
+ _assert(next_start_offset > last_offset,
+ next_start_offset=next_start_offset, last_offset=last_offset)
+
+ # Cut next_chunkdata at the end of next_start_chunknum.
+ next_cutpos = (next_start_chunknum + 1)*self._chunksize
+ last_chunkdata = (last_chunkdata[: next_start_offset - (last_offset + 1)] +
+ next_chunkdata[: next_cutpos - next_seekpos] +
+ last_chunkdata[next_lastpos - lastpos :])
+
+ # Does the next write extend beyond that chunk?
+ if next_lastpos >= next_cutpos:
+ # The part after the cut will be processed in the next call to _raw_write_share_data.
+ raw_datav[0] = (next_cutpos, next_chunkdata[next_cutpos - next_seekpos :])
+ break
+ else:
+ # Discard the write that has already been processed.
+ raw_datav.popleft()
+
+ # start_chunknum and last_chunknum are going to be written, so need to be flushed
+ # from the read cache in case the new contents are needed by a subsequent readv
+ # or writev. (Due to the 'while raw_datav' loop above, we won't need to read them
+ # again in *this* writev. That property is needed for correctness because we don't
+ # flush the write pipeline until the end of the writev.)
+
+ d2.addCallback(lambda ign: self._cache.flush_chunk(start_chunkdata))
+ d2.addCallback(lambda ign: self._cache.flush_chunk(last_chunkdata))
+
+ # Now do the current write.
+ if last_chunknum == start_chunknum:
+ d2.addCallback(self._pipeline_store_chunk, start_chunknum,
+ start_chunkdata + last_chunkdata)
+ else:
+ d2.addCallback(self._pipeline_store_chunk, start_chunknum,
+ start_chunkdata)
+
+ for middle_chunknum in xrange(start_chunknum + 1, last_chunknum):
+ d2.addCallback(self._pipeline_store_chunk, middle_chunknum,
+ data[middle_chunknum*self._chunksize - seekpos
+ : (middle_chunknum + 1)*self._chunksize - seekpos])
+
+ d2.addCallback(self._pipeline_store_chunk, last_chunknum,
+ data[last_chunkpos - seekpos :] + last_chunkdata)
+ return d2
+ d.addCallback(_got)
+ d.addCallback(self._raw_write_share_data, raw_datav, final_size) # continue the iteration
+ return d
+
+ def _pipeline_store_chunk(self, ign, chunknum, chunkdata):
+ precondition(len(chunkdata) <= self._chunksize, len_chunkdata=len(chunkdata), chunksize=self._chunksize)
+
+ chunkkey = get_chunk_key(self._key, chunknum)
+ #print "STORING", chunkkey, len(chunkdata), repr(chunkdata)
+
+ endpos = chunknum*self._chunksize + len(chunkdata)
+ if endpos > self._total_size:
+ self._set_total_size(endpos)
+
+ # We'd like to stream writes, but the supported service containers
+ # (and the IContainer interface) don't support that yet. For txaws, see
+ # https://bugs.launchpad.net/txaws/+bug/767205 and
+ # https://bugs.launchpad.net/txaws/+bug/783801
+ return self._pipeline.add(1, self._container.put_object, chunkkey, chunkdata)
+
+ def close(self):
+ # FIXME: 'close' doesn't exist in IMutableShare
+ self._discard()
+ d = self._pipeline.close()
+ d.addCallback(lambda ign: self._cache.close())
+ return d
\ No newline at end of file
--- /dev/null
+
+from zope.interface import implements
+
+from allmydata.node import InvalidValueError
+from allmydata.storage.backends.cloud.cloud_common import IContainer, \
+ ContainerRetryMixin, ContainerListMixin
+
+
+def configure_s3_container(storedir, config):
+ from allmydata.storage.backends.cloud.s3.s3_container import S3Container
+
+ accesskeyid = config.get_config("storage", "s3.access_key_id")
+ secretkey = config.get_or_create_private_config("s3secret")
+ usertoken = config.get_optional_private_config("s3usertoken")
+ producttoken = config.get_optional_private_config("s3producttoken")
+ if producttoken and not usertoken:
+ raise InvalidValueError("If private/s3producttoken is present, private/s3usertoken must also be present.")
+ url = config.get_config("storage", "s3.url", "http://s3.amazonaws.com")
+ container_name = config.get_config("storage", "s3.bucket")
+
+ return S3Container(accesskeyid, secretkey, url, container_name, usertoken, producttoken)
+
+
+class S3Container(ContainerRetryMixin, ContainerListMixin):
+ implements(IContainer)
+ """
+ I represent a real S3 container (bucket), accessed using the txaws library.
+ """
+
+ def __init__(self, access_key, secret_key, url, container_name, usertoken=None, producttoken=None):
+ # We only depend on txaws when this class is actually instantiated.
+ from txaws.credentials import AWSCredentials
+ from txaws.service import AWSServiceEndpoint
+ from txaws.s3.client import S3Client, Query
+ from txaws.s3.exception import S3Error
+
+ creds = AWSCredentials(access_key=access_key, secret_key=secret_key)
+ endpoint = AWSServiceEndpoint(uri=url)
+
+ query_factory = None
+ if usertoken is not None:
+ def make_query(*args, **kwargs):
+ amz_headers = kwargs.get("amz_headers", {})
+ if producttoken is not None:
+ amz_headers["security-token"] = (usertoken, producttoken)
+ else:
+ amz_headers["security-token"] = usertoken
+ kwargs["amz_headers"] = amz_headers
+
+ return Query(*args, **kwargs)
+ query_factory = make_query
+
+ self.client = S3Client(creds=creds, endpoint=endpoint, query_factory=query_factory)
+ self.container_name = container_name
+ self.ServiceError = S3Error
+
+ def __repr__(self):
+ return ("<%s %r>" % (self.__class__.__name__, self.container_name,))
+
+ def create(self):
+ return self._do_request('create bucket', self.client.create, self.container_name)
+
+ def delete(self):
+ return self._do_request('delete bucket', self.client.delete, self.container_name)
+
+ def list_some_objects(self, **kwargs):
+ return self._do_request('list objects', self.client.get_bucket, self.container_name, **kwargs)
+
+ def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
+ return self._do_request('PUT object', self.client.put_object, self.container_name,
+ object_name, data, content_type, metadata)
+
+ def get_object(self, object_name):
+ return self._do_request('GET object', self.client.get_object, self.container_name, object_name)
+
+ def head_object(self, object_name):
+ return self._do_request('HEAD object', self.client.head_object, self.container_name, object_name)
+
+ def delete_object(self, object_name):
+ return self._do_request('DELETE object', self.client.delete_object, self.container_name, object_name)
+
+ def put_policy(self, policy):
+ """
+ Set access control policy on a bucket.
+ """
+ query = self.client.query_factory(
+ action='PUT', creds=self.client.creds, endpoint=self.client.endpoint,
+ bucket=self.container_name, object_name='?policy', data=policy)
+ return self._do_request('PUT policy', query.submit)
+
+ def get_policy(self):
+ query = self.client.query_factory(
+ action='GET', creds=self.client.creds, endpoint=self.client.endpoint,
+ bucket=self.container_name, object_name='?policy')
+ return self._do_request('GET policy', query.submit)
+
+ def delete_policy(self):
+ query = self.client.query_factory(
+ action='DELETE', creds=self.client.creds, endpoint=self.client.endpoint,
+ bucket=self.container_name, object_name='?policy')
+ return self._do_request('DELETE policy', query.submit)
--- /dev/null
+
+import struct, os.path
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IStorageBackend, IShareSet
+from allmydata.util import fileutil, log
+from allmydata.storage.common import si_b2a, si_a2b, NUM_RE, \
+ UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
+from allmydata.storage.bucket import BucketWriter
+from allmydata.storage.backends.base import Backend, ShareSet
+from allmydata.storage.backends.disk.immutable import load_immutable_disk_share, create_immutable_disk_share
+from allmydata.storage.backends.disk.mutable import load_mutable_disk_share, create_mutable_disk_share
+from allmydata.mutable.layout import MUTABLE_MAGIC
+
+
+# storage/
+# storage/shares/incoming
+# incoming/ holds temp dirs named $PREFIX/$STORAGEINDEX/$SHNUM which will
+# be moved to storage/shares/$PREFIX/$STORAGEINDEX/$SHNUM upon success
+# storage/shares/$PREFIX/$STORAGEINDEX
+# storage/shares/$PREFIX/$STORAGEINDEX/$SHNUM
+
+# where "$PREFIX" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
+# base-32 chars).
+
+
+def si_si2dir(startdir, storage_index):
+ sia = si_b2a(storage_index)
+ return os.path.join(startdir, sia[:2], sia)
+
+def get_disk_share(home, storage_index=None, shnum=None):
+ f = open(home, 'rb')
+ try:
+ prefix = f.read(len(MUTABLE_MAGIC))
+ finally:
+ f.close()
+
+ if prefix == MUTABLE_MAGIC:
+ return load_mutable_disk_share(home, storage_index, shnum)
+ else:
+ # assume it's immutable
+ return load_immutable_disk_share(home, storage_index, shnum)
+
+
+def configure_disk_backend(storedir, config):
+ readonly = config.get_config("storage", "readonly", False, boolean=True)
+ reserved_space = config.get_config_size("storage", "reserved_space", "0")
+
+ return DiskBackend(storedir, readonly, reserved_space)
+
+
+class DiskBackend(Backend):
+ implements(IStorageBackend)
+
+ def __init__(self, storedir, readonly=False, reserved_space=0):
+ Backend.__init__(self)
+ self._storedir = storedir
+ self._readonly = readonly
+ self._reserved_space = int(reserved_space)
+ self._sharedir = os.path.join(self._storedir, 'shares')
+ fileutil.make_dirs(self._sharedir)
+ self._incomingdir = os.path.join(self._sharedir, 'incoming')
+ self._clean_incomplete()
+ if self._reserved_space and (self.get_available_space() is None):
+ log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
+ umid="0wZ27w", level=log.UNUSUAL)
+
+ def _clean_incomplete(self):
+ fileutil.rm_dir(self._incomingdir)
+ fileutil.make_dirs(self._incomingdir)
+
+ def get_sharesets_for_prefix(self, prefix):
+ prefixdir = os.path.join(self._sharedir, prefix)
+ sharesets = [self.get_shareset(si_a2b(si_s))
+ for si_s in sorted(fileutil.listdir(prefixdir))]
+ return defer.succeed(sharesets)
+
+ def get_shareset(self, storage_index):
+ sharehomedir = si_si2dir(self._sharedir, storage_index)
+ incominghomedir = si_si2dir(self._incomingdir, storage_index)
+ return DiskShareSet(storage_index, sharehomedir, incominghomedir)
+
+ def fill_in_space_stats(self, stats):
+ stats['storage_server.reserved_space'] = self._reserved_space
+ try:
+ disk = fileutil.get_disk_stats(self._sharedir, self._reserved_space)
+ writeable = disk['avail'] > 0
+
+ # spacetime predictors should use disk_avail / (d(disk_used)/dt)
+ stats['storage_server.disk_total'] = disk['total']
+ stats['storage_server.disk_used'] = disk['used']
+ stats['storage_server.disk_free_for_root'] = disk['free_for_root']
+ stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
+ stats['storage_server.disk_avail'] = disk['avail']
+ except AttributeError:
+ writeable = True
+ except EnvironmentError:
+ log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
+ writeable = False
+
+ if self._readonly:
+ stats['storage_server.disk_avail'] = 0
+ writeable = False
+
+ stats['storage_server.accepting_immutable_shares'] = int(writeable)
+
+ def get_available_space(self):
+ if self._readonly:
+ return 0
+ try:
+ return fileutil.get_available_space(self._sharedir, self._reserved_space)
+ except EnvironmentError:
+ return 0
+
+ def must_use_tubid_as_permutation_seed(self):
+ # A disk backend with existing shares must assume that it was around before #466,
+ # so must use its TubID as a permutation-seed.
+ return bool(set(fileutil.listdir(self._sharedir)) - set(["incoming"]))
+
+
+class DiskShareSet(ShareSet):
+ implements(IShareSet)
+
+ def __init__(self, storage_index, sharehomedir, incominghomedir=None):
+ ShareSet.__init__(self, storage_index)
+ self._sharehomedir = sharehomedir
+ self._incominghomedir = incominghomedir
+
+ def get_overhead(self):
+ return (fileutil.get_used_space(self._sharehomedir) +
+ fileutil.get_used_space(self._incominghomedir))
+
+ def get_shares(self):
+ si = self.get_storage_index()
+ shares = {}
+ corrupted = set()
+ for shnumstr in fileutil.listdir(self._sharehomedir, filter=NUM_RE):
+ shnum = int(shnumstr)
+ sharefile = os.path.join(self._sharehomedir, shnumstr)
+ try:
+ shares[shnum] = get_disk_share(sharefile, si, shnum)
+ except (UnknownMutableContainerVersionError,
+ UnknownImmutableContainerVersionError,
+ struct.error):
+ corrupted.add(shnum)
+
+ valid = [shares[shnum] for shnum in sorted(shares.keys())]
+ return defer.succeed( (valid, corrupted) )
+
+ def get_share(self, shnum):
+ return get_disk_share(os.path.join(self._sharehomedir, str(shnum)),
+ self.get_storage_index(), shnum)
+
+ def delete_share(self, shnum):
+ fileutil.remove(os.path.join(self._sharehomedir, str(shnum)))
+ return defer.succeed(None)
+
+ def has_incoming(self, shnum):
+ if self._incominghomedir is None:
+ return False
+ return os.path.exists(os.path.join(self._incominghomedir, str(shnum)))
+
+ def make_bucket_writer(self, account, shnum, allocated_data_length, canary):
+ finalhome = os.path.join(self._sharehomedir, str(shnum))
+ incominghome = os.path.join(self._incominghomedir, str(shnum))
+ immsh = create_immutable_disk_share(incominghome, finalhome, allocated_data_length,
+ self.get_storage_index(), shnum)
+ bw = BucketWriter(account, immsh, canary)
+ return bw
+
+ def _create_mutable_share(self, account, shnum, write_enabler):
+ fileutil.make_dirs(self._sharehomedir)
+ sharehome = os.path.join(self._sharehomedir, str(shnum))
+ serverid = account.server.get_serverid()
+ return create_mutable_disk_share(sharehome, serverid, write_enabler,
+ self.get_storage_index(), shnum, parent=account.server)
+
+ def _clean_up_after_unlink(self):
+ fileutil.rmdir_if_empty(self._sharehomedir)
+
+ def _get_sharedir(self):
+ return self._sharehomedir
--- /dev/null
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IStorageBackend, IShareSet, IShareBase, \
+ IShareForReading, IShareForWriting, IMutableShare
+
+from allmydata.util.assertutil import precondition
+from allmydata.storage.backends.base import Backend, ShareSet, empty_check_testv
+from allmydata.storage.bucket import BucketWriter
+from allmydata.storage.common import si_b2a
+
+
+def configure_null_backend(storedir, config):
+ return NullBackend()
+
+
+class NullBackend(Backend):
+ implements(IStorageBackend)
+ """
+ I am a test backend that records (in memory) which shares exist, but not their contents, leases,
+ or write-enablers.
+ """
+
+ def __init__(self):
+ Backend.__init__(self)
+ # mapping from storage_index to NullShareSet
+ self._sharesets = {}
+
+ def get_available_space(self):
+ return None
+
+ def get_sharesets_for_prefix(self, prefix):
+ sharesets = []
+ for (si, shareset) in self._sharesets.iteritems():
+ if si_b2a(si).startswith(prefix):
+ sharesets.append(shareset)
+
+ def _by_base32si(b):
+ return b.get_storage_index_string()
+ sharesets.sort(key=_by_base32si)
+ return defer.succeed(sharesets)
+
+ def get_shareset(self, storage_index):
+ shareset = self._sharesets.get(storage_index, None)
+ if shareset is None:
+ shareset = NullShareSet(storage_index)
+ self._sharesets[storage_index] = shareset
+ return shareset
+
+ def fill_in_space_stats(self, stats):
+ pass
+
+
+class NullShareSet(ShareSet):
+ implements(IShareSet)
+
+ def __init__(self, storage_index):
+ self.storage_index = storage_index
+ self._incoming_shnums = set()
+ self._immutable_shnums = set()
+ self._mutable_shnums = set()
+
+ def close_shnum(self, shnum):
+ self._incoming_shnums.remove(shnum)
+ self._immutable_shnums.add(shnum)
+ return defer.succeed(None)
+
+ def get_overhead(self):
+ return 0
+
+ def get_shares(self):
+ shares = {}
+ for shnum in self._immutable_shnums:
+ shares[shnum] = ImmutableNullShare(self, shnum)
+ for shnum in self._mutable_shnums:
+ shares[shnum] = MutableNullShare(self, shnum)
+ # This backend never has any corrupt shares.
+ return defer.succeed( ([shares[shnum] for shnum in sorted(shares.keys())], set()) )
+
+ def get_share(self, shnum):
+ if shnum in self._immutable_shnums:
+ return defer.succeed(ImmutableNullShare(self, shnum))
+ elif shnum in self._mutable_shnums:
+ return defer.succeed(MutableNullShare(self, shnum))
+ else:
+ def _not_found(): raise IndexError("no such share %d" % (shnum,))
+ return defer.execute(_not_found)
+
+ def delete_share(self, shnum, include_incoming=False):
+ if include_incoming and (shnum in self._incoming_shnums):
+ self._incoming_shnums.remove(shnum)
+ if shnum in self._immutable_shnums:
+ self._immutable_shnums.remove(shnum)
+ if shnum in self._mutable_shnums:
+ self._mutable_shnums.remove(shnum)
+ return defer.succeed(None)
+
+ def has_incoming(self, shnum):
+ return shnum in self._incoming_shnums
+
+ def get_storage_index(self):
+ return self.storage_index
+
+ def get_storage_index_string(self):
+ return si_b2a(self.storage_index)
+
+ def make_bucket_writer(self, account, shnum, allocated_data_length, canary):
+ self._incoming_shnums.add(shnum)
+ immutableshare = ImmutableNullShare(self, shnum)
+ bw = BucketWriter(account, immutableshare, canary)
+ bw.throw_out_all_data = True
+ return bw
+
+
+class NullShareBase(object):
+ implements(IShareBase)
+
+ def __init__(self, shareset, shnum):
+ self.shareset = shareset
+ self.shnum = shnum
+
+ def get_storage_index(self):
+ return self.shareset.get_storage_index()
+
+ def get_storage_index_string(self):
+ return self.shareset.get_storage_index_string()
+
+ def get_shnum(self):
+ return self.shnum
+
+ def get_data_length(self):
+ return 0
+
+ def get_size(self):
+ return 0
+
+ def get_used_space(self):
+ return 0
+
+ def unlink(self):
+ return self.shareset.delete_share(self.shnum, include_incoming=True)
+
+ def readv(self, readv):
+ datav = []
+ for (offset, length) in readv:
+ datav.append("")
+ return defer.succeed(datav)
+
+ def get_leases(self):
+ pass
+
+ def add_lease(self, lease):
+ pass
+
+ def renew_lease(self, renew_secret, new_expire_time):
+ raise IndexError("unable to renew non-existent lease")
+
+ def add_or_renew_lease(self, lease_info):
+ pass
+
+
+class ImmutableNullShare(NullShareBase):
+ implements(IShareForReading, IShareForWriting)
+ sharetype = "immutable"
+
+ def read_share_data(self, offset, length):
+ precondition(offset >= 0)
+ return defer.succeed("")
+
+ def get_allocated_data_length(self):
+ return 0
+
+ def write_share_data(self, offset, data):
+ return defer.succeed(None)
+
+ def close(self):
+ return self.shareset.close_shnum(self.shnum)
+
+
+class MutableNullShare(NullShareBase):
+ implements(IMutableShare)
+ sharetype = "mutable"
+
+ def check_write_enabler(self, write_enabler):
+ # Null backend doesn't check write enablers.
+ return defer.succeed(None)
+
+ def check_testv(self, testv):
+ return defer.succeed(empty_check_testv(testv))
+
+ def writev(self, datav, new_length):
+ return defer.succeed(None)