From: Daira Hopwood Date: Wed, 9 Apr 2014 00:33:33 +0000 (+0100) Subject: Add new files for cloud merge (rebased). X-Git-Url: https://git.rkrishnan.org/pf/content/en/seg/index.php?a=commitdiff_plain;h=cec130f2393a1b6a6e0469aa11dd2c5cb04aeb14;p=tahoe-lafs%2Ftahoe-lafs.git Add new files for cloud merge (rebased). Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/storage/backends/base.py b/src/allmydata/storage/backends/base.py new file mode 100644 index 00000000..b56266c8 --- /dev/null +++ b/src/allmydata/storage/backends/base.py @@ -0,0 +1,208 @@ + +from twisted.application import service +from twisted.internet import defer + +from allmydata.util.deferredutil import async_iterate, gatherResults +from allmydata.storage.common import si_b2a +from allmydata.storage.bucket import BucketReader +from allmydata.storage.leasedb import SHARETYPE_MUTABLE + + +class Backend(service.MultiService): + def __init__(self): + service.MultiService.__init__(self) + + def must_use_tubid_as_permutation_seed(self): + # New backends cannot have been around before #466, and so have no backward + # compatibility requirements for permutation seeds. The disk backend overrides this. + return False + + +class ShareSet(object): + """ + This class implements shareset logic that could work for all backends, but + might be useful to override for efficiency. + """ + # TODO: queue operations on a shareset to ensure atomicity for each fully + # successful operation (#1869). + + def __init__(self, storage_index): + self.storage_index = storage_index + + def get_storage_index(self): + return self.storage_index + + def get_storage_index_string(self): + return si_b2a(self.storage_index) + + def make_bucket_reader(self, account, share): + return BucketReader(account, share) + + def testv_and_readv_and_writev(self, write_enabler, + test_and_write_vectors, read_vector, + expiration_time, account): + # The implementation here depends on the following helper methods, + # which must be provided by subclasses: + # + # def _clean_up_after_unlink(self): + # """clean up resources associated with the shareset after some + # shares might have been deleted""" + # + # def _create_mutable_share(self, account, shnum, write_enabler): + # """create a mutable share with the given shnum and write_enabler""" + + sharemap = {} + d = self.get_shares() + def _got_shares( (shares, corrupted) ): + d2 = defer.succeed(None) + for share in shares: + assert not isinstance(share, defer.Deferred), share + # XXX is it correct to ignore immutable shares? Maybe get_shares should + # have a parameter saying what type it's expecting. + if share.sharetype == "mutable": + d2.addCallback(lambda ign, share=share: share.check_write_enabler(write_enabler)) + sharemap[share.get_shnum()] = share + + shnums = sorted(sharemap.keys()) + + # if d2 does not fail, write_enabler is good for all existing shares + + # now evaluate test vectors + def _check_testv(shnum): + (testv, datav, new_length) = test_and_write_vectors[shnum] + if shnum in sharemap: + d3 = sharemap[shnum].check_testv(testv) + elif shnum in corrupted: + # a corrupted share does not match any test vector + d3 = defer.succeed(False) + else: + # compare the vectors against an empty share, in which all + # reads return empty strings + d3 = defer.succeed(empty_check_testv(testv)) + + def _check_result(res): + if not res: + account.server.log("testv failed: [%d] %r" % (shnum, testv)) + return res + d3.addCallback(_check_result) + return d3 + + d2.addCallback(lambda ign: async_iterate(_check_testv, test_and_write_vectors)) + + def _gather(testv_is_good): + # Gather the read vectors, before we do any writes. This ignores any + # corrupted shares. + d3 = gatherResults([sharemap[shnum].readv(read_vector) for shnum in shnums]) + + def _do_writes(reads): + read_data = {} + for i in range(len(shnums)): + read_data[shnums[i]] = reads[i] + + d4 = defer.succeed(None) + if testv_is_good: + if len(set(test_and_write_vectors.keys()) & corrupted) > 0: + # XXX think of a better exception to raise + raise AssertionError("You asked to write share numbers %r of storage index %r, " + "but one or more of those is corrupt (numbers %r)" + % (list(sorted(test_and_write_vectors.keys())), + self.get_storage_index_string(), + list(sorted(corrupted))) ) + + # now apply the write vectors + for shnum in test_and_write_vectors: + (testv, datav, new_length) = test_and_write_vectors[shnum] + if new_length == 0: + if shnum in sharemap: + d4.addCallback(lambda ign, shnum=shnum: + sharemap[shnum].unlink()) + d4.addCallback(lambda ign, shnum=shnum: + account.remove_share_and_leases(self.storage_index, shnum)) + else: + if shnum not in sharemap: + # allocate a new share + d4.addCallback(lambda ign, shnum=shnum: + self._create_mutable_share(account, shnum, + write_enabler)) + def _record_share(share, shnum=shnum): + sharemap[shnum] = share + account.add_share(self.storage_index, shnum, share.get_used_space(), + SHARETYPE_MUTABLE) + d4.addCallback(_record_share) + d4.addCallback(lambda ign, shnum=shnum, datav=datav, new_length=new_length: + sharemap[shnum].writev(datav, new_length)) + def _update_lease(ign, shnum=shnum): + account.add_or_renew_default_lease(self.storage_index, shnum) + account.mark_share_as_stable(self.storage_index, shnum, + sharemap[shnum].get_used_space()) + d4.addCallback(_update_lease) + + if new_length == 0: + d4.addCallback(lambda ign: self._clean_up_after_unlink()) + + d4.addCallback(lambda ign: (testv_is_good, read_data)) + return d4 + d3.addCallback(_do_writes) + return d3 + d2.addCallback(_gather) + return d2 + d.addCallback(_got_shares) + return d + + def readv(self, wanted_shnums, read_vector): + """ + Read a vector from the numbered shares in this shareset. An empty + shares list means to return data from all known shares. + + @param wanted_shnums=ListOf(int) + @param read_vector=ReadVector + @return DictOf(int, ReadData): shnum -> results, with one key per share + """ + shnums = [] + dreads = [] + d = self.get_shares() + def _got_shares( (shares, corrupted) ): + # We ignore corrupted shares. + for share in shares: + assert not isinstance(share, defer.Deferred), share + shnum = share.get_shnum() + if not wanted_shnums or shnum in wanted_shnums: + shnums.append(share.get_shnum()) + dreads.append(share.readv(read_vector)) + return gatherResults(dreads) + d.addCallback(_got_shares) + + def _got_reads(reads): + datavs = {} + for i in range(len(shnums)): + datavs[shnums[i]] = reads[i] + return datavs + d.addCallback(_got_reads) + return d + + +def testv_compare(a, op, b): + assert op in ("lt", "le", "eq", "ne", "ge", "gt") + if op == "lt": + return a < b + if op == "le": + return a <= b + if op == "eq": + return a == b + if op == "ne": + return a != b + if op == "ge": + return a >= b + if op == "gt": + return a > b + # never reached + + +def empty_check_testv(testv): + test_good = True + for (offset, length, operator, specimen) in testv: + data = "" + if not testv_compare(data, operator, specimen): + test_good = False + break + return test_good diff --git a/src/allmydata/storage/backends/cloud/cloud_backend.py b/src/allmydata/storage/backends/cloud/cloud_backend.py new file mode 100644 index 00000000..4fcb0dc5 --- /dev/null +++ b/src/allmydata/storage/backends/cloud/cloud_backend.py @@ -0,0 +1,174 @@ + +from twisted.internet import defer + +from zope.interface import implements +from allmydata.interfaces import IStorageBackend, IShareSet + +from allmydata.node import InvalidValueError +from allmydata.util.deferredutil import gatherResults +from allmydata.util.assertutil import _assert +from allmydata.util.dictutil import NumDict +from allmydata.util.encodingutil import quote_output +from allmydata.storage.common import si_a2b, NUM_RE +from allmydata.storage.bucket import BucketWriter +from allmydata.storage.backends.base import Backend, ShareSet +from allmydata.storage.backends.cloud.immutable import ImmutableCloudShareForReading, ImmutableCloudShareForWriting +from allmydata.storage.backends.cloud.mutable import MutableCloudShare +from allmydata.storage.backends.cloud.cloud_common import get_share_key, delete_chunks +from allmydata.mutable.layout import MUTABLE_MAGIC + + +def get_cloud_share(container, storage_index, shnum, total_size): + key = get_share_key(storage_index, shnum) + d = container.get_object(key) + def _make_share(first_chunkdata): + if first_chunkdata.startswith(MUTABLE_MAGIC): + return MutableCloudShare(container, storage_index, shnum, total_size, first_chunkdata) + else: + # assume it's immutable + return ImmutableCloudShareForReading(container, storage_index, shnum, total_size, first_chunkdata) + d.addCallback(_make_share) + return d + + +def configure_cloud_backend(storedir, config): + # REMIND: when multiple container implementations are supported, only import the container we're going to use. + from allmydata.storage.backends.cloud.s3.s3_container import configure_s3_container + + if config.get_config("storage", "readonly", False, boolean=True): + raise InvalidValueError("[storage]readonly is not supported by the cloud backend; " + "make the container read-only instead.") + + backendtype = config.get_config("storage", "backend", "disk") + if backendtype == "s3": + backendtype = "cloud.s3" + + container_configurators = { + 'cloud.s3': configure_s3_container, + } + + if backendtype not in container_configurators: + raise InvalidValueError("%s is not supported by the cloud backend; it must be one of %s" + % (quote_output("[storage]backend = " + backendtype), container_configurators.keys()) ) + + container = container_configurators[backendtype](storedir, config) + return CloudBackend(container) + + +class CloudBackend(Backend): + implements(IStorageBackend) + + def __init__(self, container): + Backend.__init__(self) + self._container = container + + # set of (storage_index, shnum) of incoming shares + self._incomingset = set() + + def get_sharesets_for_prefix(self, prefix): + d = self._container.list_objects(prefix='shares/%s/' % (prefix,)) + def _get_sharesets(res): + # XXX this enumerates all shares to get the set of SIs. + # Is there a way to enumerate SIs more efficiently? + si_strings = set() + for item in res.contents: + # XXX better error handling + path = item.key.split('/') + _assert(path[0:2] == ["shares", prefix], path=path, prefix=prefix) + si_strings.add(path[2]) + + # XXX we want this to be deterministic, so we return the sharesets sorted + # by their si_strings, but we shouldn't need to explicitly re-sort them + # because list_objects returns a sorted list. + return [CloudShareSet(si_a2b(s), self._container, self._incomingset) for s in sorted(si_strings)] + d.addCallback(_get_sharesets) + return d + + def get_shareset(self, storage_index): + return CloudShareSet(storage_index, self._container, self._incomingset) + + def fill_in_space_stats(self, stats): + # TODO: query space usage of container if supported. + # TODO: query whether the container is read-only and set + # accepting_immutable_shares accordingly. + stats['storage_server.accepting_immutable_shares'] = 1 + + def get_available_space(self): + # TODO: query space usage of container if supported. + return 2**64 + + +class CloudShareSet(ShareSet): + implements(IShareSet) + + def __init__(self, storage_index, container, incomingset): + ShareSet.__init__(self, storage_index) + self._container = container + self._incomingset = incomingset + self._key = get_share_key(storage_index) + + def get_overhead(self): + return 0 + + def get_shares(self): + d = self._container.list_objects(prefix=self._key) + def _get_shares(res): + si = self.get_storage_index() + shnum_to_total_size = NumDict() + for item in res.contents: + key = item.key + _assert(key.startswith(self._key), key=key, self_key=self._key) + path = key.split('/') + if len(path) == 4: + (shnumstr, _, chunknumstr) = path[3].partition('.') + chunknumstr = chunknumstr or '0' + if NUM_RE.match(shnumstr) and NUM_RE.match(chunknumstr): + # The size is taken as the sum of sizes for all chunks, but for simplicity + # we don't check here that the individual chunk sizes match expectations. + # If they don't, that will cause an error on reading. + shnum_to_total_size.add_num(int(shnumstr), int(item.size)) + + return gatherResults([get_cloud_share(self._container, si, shnum, total_size) + for (shnum, total_size) in shnum_to_total_size.items_sorted_by_key()]) + d.addCallback(_get_shares) + # TODO: return information about corrupt shares. + d.addCallback(lambda shares: (shares, set()) ) + return d + + def get_share(self, shnum): + key = "%s%d" % (self._key, shnum) + d = self._container.list_objects(prefix=key) + def _get_share(res): + total_size = 0 + for item in res.contents: + total_size += item.size + return get_cloud_share(self._container, self.get_storage_index(), shnum, total_size) + d.addCallback(_get_share) + return d + + def delete_share(self, shnum): + key = "%s%d" % (self._key, shnum) + return delete_chunks(self._container, key) + + def has_incoming(self, shnum): + return (self.get_storage_index(), shnum) in self._incomingset + + def make_bucket_writer(self, account, shnum, allocated_data_length, canary): + immsh = ImmutableCloudShareForWriting(self._container, self.get_storage_index(), shnum, + allocated_data_length, self._incomingset) + d = defer.succeed(None) + d.addCallback(lambda ign: BucketWriter(account, immsh, canary)) + return d + + def _create_mutable_share(self, account, shnum, write_enabler): + serverid = account.server.get_serverid() + return MutableCloudShare.create_empty_share(self._container, serverid, write_enabler, + self.get_storage_index(), shnum, parent=account.server) + + def _clean_up_after_unlink(self): + pass + + def _get_sharedir(self): + # For use by tests, only with the mock cloud backend. + # It is OK that _get_path doesn't exist on real container objects. + return self._container._get_path(self._key) diff --git a/src/allmydata/storage/backends/cloud/cloud_common.py b/src/allmydata/storage/backends/cloud/cloud_common.py new file mode 100644 index 00000000..d1d5b18d --- /dev/null +++ b/src/allmydata/storage/backends/cloud/cloud_common.py @@ -0,0 +1,507 @@ + +from collections import deque + +from twisted.internet import defer, reactor, task +from twisted.python.failure import Failure + +from zope.interface import Interface, implements +from allmydata.interfaces import IShareBase + +from allmydata.util import log +from allmydata.util.assertutil import precondition, _assert +from allmydata.util.deferredutil import eventually_callback, eventually_errback, eventual_chain, gatherResults +from allmydata.storage.common import si_b2a, NUM_RE + + +# The container has keys of the form shares/$PREFIX/$STORAGEINDEX/$SHNUM.$CHUNK + +def get_share_key(si, shnum=None): + sistr = si_b2a(si) + if shnum is None: + return "shares/%s/%s/" % (sistr[:2], sistr) + else: + return "shares/%s/%s/%d" % (sistr[:2], sistr, shnum) + +def get_chunk_key(share_key, chunknum): + precondition(chunknum >= 0, chunknum=chunknum) + if chunknum == 0: + return share_key + else: + return "%s.%d" % (share_key, chunknum) + + +PREFERRED_CHUNK_SIZE = 512*1024 +PIPELINE_DEPTH = 4 + +ZERO_CHUNKDATA = "\x00"*PREFERRED_CHUNK_SIZE + +def get_zero_chunkdata(size): + if size <= PREFERRED_CHUNK_SIZE: + return ZERO_CHUNKDATA[: size] + else: + return "\x00"*size + + +class IContainer(Interface): + """ + I represent a cloud container. + """ + def create(): + """ + Create this container. + """ + + def delete(): + """ + Delete this container. + The cloud service may require the container to be empty before it can be deleted. + """ + + def list_objects(prefix=''): + """ + Get a ContainerListing that lists objects in this container. + + prefix: (str) limit the returned keys to those starting with prefix. + """ + + def put_object(object_name, data, content_type=None, metadata={}): + """ + Put an object in this bucket. + Any existing object of the same name will be replaced. + """ + + def get_object(object_name): + """ + Get an object from this container. + """ + + def head_object(object_name): + """ + Retrieve object metadata only. + """ + + def delete_object(object_name): + """ + Delete an object from this container. + Once deleted, there is no method to restore or undelete an object. + """ + + +def delete_chunks(container, share_key, from_chunknum=0): + d = container.list_objects(prefix=share_key) + def _delete(res): + def _suppress_404(f): + e = f.trap(container.ServiceError) + if e.get_error_code() != 404: + return f + + d2 = defer.succeed(None) + for item in res.contents: + key = item.key + _assert(key.startswith(share_key), key=key, share_key=share_key) + path = key.split('/') + if len(path) == 4: + (_, _, chunknumstr) = path[3].partition('.') + chunknumstr = chunknumstr or "0" + if NUM_RE.match(chunknumstr) and int(chunknumstr) >= from_chunknum: + d2.addCallback(lambda ign, key=key: container.delete_object(key)) + d2.addErrback(_suppress_404) + return d2 + d.addCallback(_delete) + return d + + +class CloudShareBase(object): + implements(IShareBase) + """ + Attributes: + _container: (IContainer) the cloud container that stores this share + _storage_index: (str) binary storage index + _shnum: (integer) share number + _key: (str) the key prefix under which this share will be stored (no .chunknum suffix) + _data_length: (integer) length of data excluding headers and leases + _total_size: (integer) total size of the sharefile + + Methods: + _discard(self): object will no longer be used; discard references to potentially large data + """ + def __init__(self, container, storage_index, shnum): + precondition(IContainer.providedBy(container), container=container) + precondition(isinstance(storage_index, str), storage_index=storage_index) + precondition(isinstance(shnum, int), shnum=shnum) + + # These are always known immediately. + self._container = container + self._storage_index = storage_index + self._shnum = shnum + self._key = get_share_key(storage_index, shnum) + + # Subclasses must set _data_length and _total_size. + + def __repr__(self): + return ("<%s at %r key %r>" % (self.__class__.__name__, self._container, self._key,)) + + def get_storage_index(self): + return self._storage_index + + def get_storage_index_string(self): + return si_b2a(self._storage_index) + + def get_shnum(self): + return self._shnum + + def get_data_length(self): + return self._data_length + + def get_size(self): + return self._total_size + + def get_used_space(self): + # We're not charged for any per-object overheads in supported cloud services, so + # total object data sizes are what we're interested in for statistics and accounting. + return self.get_size() + + def unlink(self): + self._discard() + return delete_chunks(self._container, self._key) + + def _get_path(self): + """ + When used with the mock cloud container, this returns the path of the file containing + the first chunk. For a real cloud container, it raises an error. + """ + # It is OK that _get_path doesn't exist on real cloud container objects. + return self._container._get_path(self._key) + + +class CloudShareReaderMixin: + """ + Attributes: + _data_length: (integer) length of data excluding headers and leases + _chunksize: (integer) size of each chunk possibly excluding the last + _cache: (ChunkCache) the cache used to read chunks + + DATA_OFFSET: (integer) offset to the start-of-data from start of the sharefile + """ + def readv(self, readv): + sorted_readv = sorted(zip(readv, xrange(len(readv)))) + datav = [None]*len(readv) + for (v, i) in sorted_readv: + (offset, length) = v + datav[i] = self.read_share_data(offset, length) + return gatherResults(datav) + + def read_share_data(self, offset, length): + precondition(offset >= 0) + + # Reads beyond the end of the data are truncated. + # Reads that start beyond the end of the data return an empty string. + seekpos = self.DATA_OFFSET + offset + actuallength = max(0, min(length, self._data_length - offset)) + if actuallength == 0: + return defer.succeed("") + + lastpos = seekpos + actuallength - 1 + _assert(lastpos > 0, seekpos=seekpos, actuallength=actuallength, lastpos=lastpos) + start_chunknum = seekpos / self._chunksize + start_offset = seekpos % self._chunksize + last_chunknum = lastpos / self._chunksize + last_offset = lastpos % self._chunksize + _assert(start_chunknum <= last_chunknum, start_chunknum=start_chunknum, last_chunknum=last_chunknum) + + parts = deque() + + def _load_part(ign, chunknum): + # determine which part of this chunk we need + start = 0 + end = self._chunksize + if chunknum == start_chunknum: + start = start_offset + if chunknum == last_chunknum: + end = last_offset + 1 + #print "LOAD", get_chunk_key(self._key, chunknum), start, end + + # d2 fires when we should continue loading the next chunk; chunkdata_d fires with the actual data. + chunkdata_d = defer.Deferred() + d2 = self._cache.get(chunknum, chunkdata_d) + if start > 0 or end < self._chunksize: + chunkdata_d.addCallback(lambda chunkdata: chunkdata[start : end]) + parts.append(chunkdata_d) + return d2 + + d = defer.succeed(None) + for i in xrange(start_chunknum, last_chunknum + 1): + d.addCallback(_load_part, i) + d.addCallback(lambda ign: gatherResults(parts)) + d.addCallback(lambda pieces: ''.join(pieces)) + return d + + +class CloudError(Exception): + pass + + +BACKOFF_SECONDS_FOR_5XX = (0, 2, 10) + + +class ContainerRetryMixin: + """ + I provide a helper method for performing an operation on a cloud container that will retry up to + len(BACKOFF_SECONDS_FOR_5XX) times (not including the initial try). If the initial try fails, a + single incident will be triggered after the operation has succeeded or failed. + """ + + def _do_request(self, description, operation, *args, **kwargs): + d = defer.maybeDeferred(operation, *args, **kwargs) + def _retry(f): + d2 = self._handle_error(f, 1, None, description, operation, *args, **kwargs) + def _trigger_incident(res): + log.msg(format="error(s) on cloud container operation: %(description)s %(arguments)s %(kwargs)s", + arguments=args[:2], kwargs=kwargs, description=description, + level=log.WEIRD) + return res + d2.addBoth(_trigger_incident) + return d2 + d.addErrback(_retry) + return d + + def _handle_error(self, f, trynum, first_err_and_tb, description, operation, *args, **kwargs): + f.trap(self.ServiceError) + + # Don't use f.getTracebackObject() since a fake traceback will not do for the 3-arg form of 'raise'. + # tb can be None (which is acceptable for 3-arg raise) if we don't have a traceback. + tb = getattr(f, 'tb', None) + fargs = f.value.args + if len(fargs) > 2 and fargs[2] and 'signaturedoesnotmatch' in fargs[2].lower(): + fargs = fargs[:2] + ("SignatureDoesNotMatch response redacted",) + fargs[3:] + + args_without_data = args[:2] + msg = "try %d failed: %s %s %s" % (trynum, description, args_without_data, kwargs) + err = CloudError(msg, *fargs) + + # This should not trigger an incident; we want to do that at the end. + log.msg(format="try %(trynum)d failed: %(description)s %(arguments)s %(kwargs)s %(fargs)s", + trynum=trynum, arguments=args_without_data, kwargs=kwargs, description=description, fargs=repr(fargs), + level=log.INFREQUENT) + + if first_err_and_tb is None: + first_err_and_tb = (err, tb) + + if trynum > len(BACKOFF_SECONDS_FOR_5XX): + # If we run out of tries, raise the error we got on the first try (which *may* have + # a more useful traceback). + (first_err, first_tb) = first_err_and_tb + raise first_err.__class__, first_err, first_tb + + fargs = f.value.args + if len(fargs) > 0 and int(fargs[0]) >= 500 and int(fargs[0]) < 600: + # Retry on 5xx errors. + d = task.deferLater(reactor, BACKOFF_SECONDS_FOR_5XX[trynum-1], operation, *args, **kwargs) + d.addErrback(self._handle_error, trynum+1, first_err_and_tb, description, operation, *args, **kwargs) + return d + + # If we get an error response other than a 5xx, raise that error even if it was on a retry. + raise err.__class__, err, tb + + +def concat(seqs): + """ + O(n), rather than O(n^2), concatenation of list-like things, returning a list. + I can't believe this isn't built in. + """ + total_len = 0 + for seq in seqs: + total_len += len(seq) + result = [None]*total_len + i = 0 + for seq in seqs: + for x in seq: + result[i] = x + i += 1 + _assert(i == total_len, i=i, total_len=total_len) + return result + + +class ContainerListMixin: + """ + S3 has a limitation of 1000 object entries returned on each list (GET Bucket) request. + I provide a helper method to repeat the call as many times as necessary to get a full + listing. The container is assumed to implement: + + def list_some_objects(self, **kwargs): + # kwargs may include 'prefix' and 'marker' parameters as documented at + # . + # returns Deferred ContainerListing + + Note that list_some_objects is assumed to be reliable; so, if retries are needed, + the container class should also inherit from ContainerRetryMixin and list_some_objects + should make the request via _do_request. + + The 'delimiter' parameter of the GET Bucket API is not supported. + """ + def list_objects(self, prefix=''): + kwargs = {'prefix': prefix} + all_contents = deque() + def _list_some(): + d2 = self.list_some_objects(**kwargs) + def _got_listing(res): + all_contents.append(res.contents) + if res.is_truncated == "true": + _assert(len(res.contents) > 0) + marker = res.contents[-1].key + _assert('marker' not in kwargs or marker > kwargs['marker'], + "Not making progress in list_objects", kwargs=kwargs, marker=marker) + kwargs['marker'] = marker + return _list_some() + else: + _assert(res.is_truncated == "false", is_truncated=res.is_truncated) + return res + d2.addCallback(_got_listing) + return d2 + + d = _list_some() + d.addCallback(lambda res: res.__class__(res.name, res.prefix, res.marker, res.max_keys, + "false", concat(all_contents))) + def _log(f): + log.msg(f, level=log.WEIRD) + return f + d.addErrback(_log) + return d + + +class BackpressurePipeline(object): + """ + I manage a pipeline of Deferred operations that allows the data source to feel backpressure + when the pipeline is "full". I do not actually limit the number of operations in progress. + """ + OPEN = 0 + CLOSING = 1 + CLOSED = 2 + + def __init__(self, capacity): + self._capacity = capacity # how full we can be before causing calls to 'add' to block + self._gauge = 0 # how full we are + self._waiting = [] # callers of add() who are blocked + self._unfinished = 0 # number of pending operations + self._result_d = defer.Deferred() + self._state = self.OPEN + + def add(self, _size, _func, *args, **kwargs): + if self._state == self.CLOSED: + msg = "add() called on closed BackpressurePipeline" + log.err(msg, level=log.WEIRD) + def _already_closed(): raise AssertionError(msg) + return defer.execute(_already_closed) + self._gauge += _size + self._unfinished += 1 + fd = defer.maybeDeferred(_func, *args, **kwargs) + fd.addBoth(self._call_finished, _size) + fd.addErrback(log.err, "BackpressurePipeline._call_finished raised an exception") + if self._gauge < self._capacity: + return defer.succeed(None) + d = defer.Deferred() + self._waiting.append(d) + return d + + def fail(self, f): + if self._state != self.CLOSED: + self._state = self.CLOSED + eventually_errback(self._result_d)(f) + + def flush(self): + if self._state == self.CLOSED: + return defer.succeed(self._result_d) + + d = self.close() + d.addBoth(self.reopen) + return d + + def close(self): + if self._state != self.CLOSED: + if self._unfinished == 0: + self._state = self.CLOSED + eventually_callback(self._result_d)(None) + else: + self._state = self.CLOSING + return self._result_d + + def reopen(self, res=None): + _assert(self._state == self.CLOSED, state=self._state) + self._result_d = defer.Deferred() + self._state = self.OPEN + return res + + def _call_finished(self, res, size): + self._unfinished -= 1 + self._gauge -= size + if isinstance(res, Failure): + self.fail(res) + + if self._state == self.CLOSING: + # repeat the unfinished == 0 check + self.close() + + if self._state == self.CLOSED: + while self._waiting: + d = self._waiting.pop(0) + eventual_chain(self._result_d, d) + elif self._gauge < self._capacity: + while self._waiting: + d = self._waiting.pop(0) + eventually_callback(d)(None) + return None + + +class ChunkCache(object): + """I cache chunks for a specific share object.""" + + def __init__(self, container, key, chunksize, nchunks=1, initial_cachemap={}): + self._container = container + self._key = key + self._chunksize = chunksize + self._nchunks = nchunks + + # chunknum -> deferred data + self._cachemap = initial_cachemap + self._pipeline = BackpressurePipeline(PIPELINE_DEPTH) + + def set_nchunks(self, nchunks): + self._nchunks = nchunks + + def _load_chunk(self, chunknum, chunkdata_d): + d = self._container.get_object(get_chunk_key(self._key, chunknum)) + eventual_chain(source=d, target=chunkdata_d) + return d + + def get(self, chunknum, result_d): + if chunknum in self._cachemap: + # cache hit; never stall + eventual_chain(source=self._cachemap[chunknum], target=result_d) + return defer.succeed(None) + + # Evict any chunks other than the first and last two, until there are + # three or fewer chunks left cached. + for candidate_chunknum in self._cachemap.keys(): + if len(self._cachemap) <= 3: + break + if candidate_chunknum not in (0, self._nchunks-2, self._nchunks-1): + self.flush_chunk(candidate_chunknum) + + # cache miss; stall when the pipeline is full + chunkdata_d = defer.Deferred() + d = self._pipeline.add(1, self._load_chunk, chunknum, chunkdata_d) + def _check(res): + _assert(res is not None) + return res + chunkdata_d.addCallback(_check) + self._cachemap[chunknum] = chunkdata_d + eventual_chain(source=chunkdata_d, target=result_d) + return d + + def flush_chunk(self, chunknum): + if chunknum in self._cachemap: + del self._cachemap[chunknum] + + def close(self): + self._cachemap = None + return self._pipeline.close() diff --git a/src/allmydata/storage/backends/cloud/immutable.py b/src/allmydata/storage/backends/cloud/immutable.py new file mode 100644 index 00000000..cc292982 --- /dev/null +++ b/src/allmydata/storage/backends/cloud/immutable.py @@ -0,0 +1,188 @@ + +import struct + +from cStringIO import StringIO + +from twisted.internet import defer + +from zope.interface import implements +from allmydata.interfaces import IShareForReading, IShareForWriting + +from allmydata.util.assertutil import precondition, _assert +from allmydata.util.mathutil import div_ceil +from allmydata.storage.common import UnknownImmutableContainerVersionError, DataTooLargeError +from allmydata.storage.backends.cloud import cloud_common +from allmydata.storage.backends.cloud.cloud_common import get_chunk_key, \ + BackpressurePipeline, ChunkCache, CloudShareBase, CloudShareReaderMixin + + +# Each share file (stored in the chunks with keys 'shares/$PREFIX/$STORAGEINDEX/$SHNUM.$CHUNK') +# contains lease information [currently inaccessible] and share data. The share data is +# accessed by RIBucketWriter.write and RIBucketReader.read . + +# The share file has the following layout: +# 0x00: share file version number, four bytes, current version is 1 +# 0x04: always zero (was share data length prior to Tahoe-LAFS v1.3.0) +# 0x08: number of leases, four bytes big-endian +# 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy) +# data_length + 0x0c: first lease. Each lease record is 72 bytes. (not used) + + +class ImmutableCloudShareMixin: + sharetype = "immutable" + LEASE_SIZE = struct.calcsize(">L32s32sL") # for compatibility + HEADER = ">LLL" + HEADER_SIZE = struct.calcsize(HEADER) + DATA_OFFSET = HEADER_SIZE + + +class ImmutableCloudShareForWriting(CloudShareBase, ImmutableCloudShareMixin): + implements(IShareForWriting) + + def __init__(self, container, storage_index, shnum, allocated_data_length, incomingset): + """ + I won't allow more than allocated_data_length to be written to me. + """ + precondition(isinstance(allocated_data_length, (int, long)), allocated_data_length) + CloudShareBase.__init__(self, container, storage_index, shnum) + + self._chunksize = cloud_common.PREFERRED_CHUNK_SIZE + self._allocated_data_length = allocated_data_length + + self._buf = StringIO() + # The second field, which was the four-byte share data length in + # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0. + # We also write 0 for the number of leases. + self._buf.write(struct.pack(self.HEADER, 1, 0, 0) ) + self._set_size(self._buf.tell()) + self._current_chunknum = 0 + + self._incomingset = incomingset + self._incomingset.add( (storage_index, shnum) ) + + self._pipeline = BackpressurePipeline(cloud_common.PIPELINE_DEPTH) + + def _set_size(self, size): + self._total_size = size + self._data_length = size - self.DATA_OFFSET # no leases + + def get_allocated_data_length(self): + return self._allocated_data_length + + def write_share_data(self, offset, data): + """Write 'data' at position 'offset' past the end of the header.""" + seekpos = self.DATA_OFFSET + offset + precondition(seekpos >= self._total_size, offset=offset, seekpos=seekpos, total_size=self._total_size) + if offset + len(data) > self._allocated_data_length: + raise DataTooLargeError(self._allocated_data_length, offset, len(data)) + + self._set_size(self._total_size + len(data)) + return self._store_or_buffer( (seekpos, data, 0) ) + + def close(self): + chunkdata = self._buf.getvalue() + self._discard() + d = self._pipeline_store_next_chunk(chunkdata) + d.addCallback(lambda ign: self._pipeline.close()) + return d + + def _store_or_buffer(self, (seekpos, b, b_offset) ): + """ + Helper method that stores the next complete chunk to the container or buffers + an incomplete chunk. The data still to be written is b[b_offset:], but we may + only process part of it in this call. + """ + chunknum = seekpos / self._chunksize + offset_in_chunk = seekpos % self._chunksize + + _assert(chunknum >= self._current_chunknum, seekpos=seekpos, chunknum=chunknum, + current_chunknum=self._current_chunknum) + + if chunknum > self._current_chunknum or offset_in_chunk + (len(b) - b_offset) >= self._chunksize: + if chunknum > self._current_chunknum: + # The write left a gap that spans a chunk boundary. Fill with zeroes to the end + # of the current chunk and store it. + # TODO: test this case + self._buf.seek(self._chunksize - 1) + self._buf.write("\x00") + else: + # Store a complete chunk. + writelen = self._chunksize - offset_in_chunk + self._buf.seek(offset_in_chunk) + self._buf.write(b[b_offset : b_offset + writelen]) + seekpos += writelen + b_offset += writelen + + chunkdata = self._buf.getvalue() + self._buf = StringIO() + _assert(len(chunkdata) == self._chunksize, len_chunkdata=len(chunkdata), chunksize=self._chunksize) + + d2 = self._pipeline_store_next_chunk(chunkdata) + d2.addCallback(lambda ign: self._store_or_buffer( (seekpos, b, b_offset) )) + return d2 + else: + # Buffer an incomplete chunk. + if b_offset > 0: + b = b[b_offset :] + self._buf.seek(offset_in_chunk) + self._buf.write(b) + return defer.succeed(None) + + def _pipeline_store_next_chunk(self, chunkdata): + chunkkey = get_chunk_key(self._key, self._current_chunknum) + self._current_chunknum += 1 + #print "STORING", chunkkey, len(chunkdata) + + # We'd like to stream writes, but the supported service containers + # (and the IContainer interface) don't support that yet. For txaws, see + # https://bugs.launchpad.net/txaws/+bug/767205 and + # https://bugs.launchpad.net/txaws/+bug/783801 + return self._pipeline.add(1, self._container.put_object, chunkkey, chunkdata) + + def _discard(self): + self._buf = None + self._incomingset.discard( (self.get_storage_index(), self.get_shnum()) ) + + +class ImmutableCloudShareForReading(CloudShareBase, ImmutableCloudShareMixin, CloudShareReaderMixin): + implements(IShareForReading) + + def __init__(self, container, storage_index, shnum, total_size, first_chunkdata): + CloudShareBase.__init__(self, container, storage_index, shnum) + + precondition(isinstance(total_size, (int, long)), total_size=total_size) + precondition(isinstance(first_chunkdata, str), type(first_chunkdata)) + precondition(len(first_chunkdata) <= total_size, len_first_chunkdata=len(first_chunkdata), total_size=total_size) + + chunksize = len(first_chunkdata) + if chunksize < self.HEADER_SIZE: + msg = "%r had incomplete header (%d bytes)" % (self, chunksize) + raise UnknownImmutableContainerVersionError(msg) + + self._total_size = total_size + self._chunksize = chunksize + nchunks = div_ceil(total_size, chunksize) + initial_cachemap = {0: defer.succeed(first_chunkdata)} + self._cache = ChunkCache(container, self._key, chunksize, nchunks, initial_cachemap) + #print "ImmutableCloudShareForReading", total_size, chunksize, self._key + + header = first_chunkdata[:self.HEADER_SIZE] + (version, unused, num_leases) = struct.unpack(self.HEADER, header) + + if version != 1: + msg = "%r had version %d but we wanted 1" % (self, version) + raise UnknownImmutableContainerVersionError(msg) + + # We cannot write leases in share files, but allow them to be present + # in case a share file is copied from a disk backend, or in case we + # need them in future. + self._data_length = total_size - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE) + + # TODO: raise a better exception. + _assert(self._data_length >= 0, data_length=self._data_length) + + # Boilerplate is in CloudShareBase, read implementation is in CloudShareReaderMixin. + # So nothing to implement here. Yay! + + def _discard(self): + pass diff --git a/src/allmydata/storage/backends/cloud/mock_cloud.py b/src/allmydata/storage/backends/cloud/mock_cloud.py new file mode 100644 index 00000000..73677626 --- /dev/null +++ b/src/allmydata/storage/backends/cloud/mock_cloud.py @@ -0,0 +1,231 @@ + +import os.path + +from twisted.internet import defer +from twisted.web.error import Error +from allmydata.util.deferredutil import async_iterate + +from zope.interface import implements + +from allmydata.storage.backends.cloud.cloud_common import IContainer, \ + ContainerRetryMixin, ContainerListMixin +from allmydata.util.time_format import iso_utc +from allmydata.util import fileutil + + +MAX_KEYS = 1000 + + +def configure_mock_cloud_backend(storedir, config): + from allmydata.storage.backends.cloud.cloud_backend import CloudBackend + + container = MockContainer(storedir) + return CloudBackend(container) + + +class MockContainer(ContainerRetryMixin, ContainerListMixin): + implements(IContainer) + """ + I represent a mock cloud container that stores its data in the local filesystem. + I also keep track of the number of loads and stores. + """ + + def __init__(self, storagedir): + self._storagedir = storagedir + self.container_name = "MockContainer" + self.ServiceError = MockServiceError + self._load_count = 0 + self._store_count = 0 + + fileutil.make_dirs(os.path.join(self._storagedir, "shares")) + + def __repr__(self): + return ("<%s at %r>" % (self.__class__.__name__, self._storagedir,)) + + def _create(self): + return defer.execute(self._not_implemented) + + def _delete(self): + return defer.execute(self._not_implemented) + + def _iterate_dirs(self): + shares_dir = os.path.join(self._storagedir, "shares") + for prefixstr in sorted(fileutil.listdir(shares_dir)): + prefixkey = "shares/%s" % (prefixstr,) + prefixdir = os.path.join(shares_dir, prefixstr) + for sistr in sorted(fileutil.listdir(prefixdir)): + sikey = "%s/%s" % (prefixkey, sistr) + sidir = os.path.join(prefixdir, sistr) + for shnumstr in sorted(fileutil.listdir(sidir)): + sharefile = os.path.join(sidir, shnumstr) + yield (sharefile, "%s/%s" % (sikey, shnumstr)) + + def _list_some_objects(self, ign, prefix='', marker=None, max_keys=None): + if max_keys is None: + max_keys = MAX_KEYS + contents = [] + def _next_share(res): + if res is None: + return + (sharefile, sharekey) = res + # note that all strings are > None + if sharekey.startswith(prefix) and sharekey > marker: + stat_result = os.stat(sharefile) + mtime_utc = iso_utc(stat_result.st_mtime, sep=' ')+'+00:00' + item = ContainerItem(key=sharekey, modification_date=mtime_utc, etag="", + size=stat_result.st_size, storage_class="STANDARD") + contents.append(item) + return len(contents) < max_keys + + d = async_iterate(_next_share, self._iterate_dirs()) + def _done(completed): + contents.sort(key=lambda item: item.key) + return ContainerListing(self.container_name, '', '', max_keys, + is_truncated=str(not completed).lower(), contents=contents) + d.addCallback(_done) + return d + + def _get_path(self, object_name, must_exist=False): + # This method is also called by tests. + sharefile = os.path.join(self._storagedir, object_name) + if must_exist and not os.path.exists(sharefile): + raise MockServiceError("", 404, "not found") + return sharefile + + def _put_object(self, ign, object_name, data, content_type, metadata): + assert content_type is None, content_type + assert metadata == {}, metadata + sharefile = self._get_path(object_name) + fileutil.make_dirs(os.path.dirname(sharefile)) + fileutil.write(sharefile, data) + self._store_count += 1 + return defer.succeed(None) + + def _get_object(self, ign, object_name): + self._load_count += 1 + data = fileutil.read(self._get_path(object_name, must_exist=True)) + return defer.succeed(data) + + def _head_object(self, ign, object_name): + return defer.execute(self._not_implemented) + + def _delete_object(self, ign, object_name): + fileutil.remove(self._get_path(object_name, must_exist=True)) + return defer.succeed(None) + + def _not_implemented(self): + raise NotImplementedError + + # methods that use error handling from ContainerRetryMixin + + def create(self): + return self._do_request('create bucket', self._create, self.container_name) + + def delete(self): + return self._do_request('delete bucket', self._delete, self.container_name) + + def list_some_objects(self, **kwargs): + return self._do_request('list objects', self._list_some_objects, self.container_name, **kwargs) + + def put_object(self, object_name, data, content_type=None, metadata={}): + return self._do_request('PUT object', self._put_object, self.container_name, object_name, + data, content_type, metadata) + + def get_object(self, object_name): + return self._do_request('GET object', self._get_object, self.container_name, object_name) + + def head_object(self, object_name): + return self._do_request('HEAD object', self._head_object, self.container_name, object_name) + + def delete_object(self, object_name): + return self._do_request('DELETE object', self._delete_object, self.container_name, object_name) + + def reset_load_store_counts(self): + self._load_count = 0 + self._store_count = 0 + + def get_load_count(self): + return self._load_count + + def get_store_count(self): + return self._store_count + + +class MockServiceError(Error): + """ + A error class similar to txaws' S3Error. + """ + def __init__(self, xml_bytes, status, message=None, response=None, request_id="", host_id=""): + Error.__init__(self, status, message, response) + self.original = xml_bytes + self.status = str(status) + self.message = str(message) + self.request_id = request_id + self.host_id = host_id + + def get_error_code(self): + return self.status + + def get_error_message(self): + return self.message + + def parse(self, xml_bytes=""): + raise NotImplementedError + + def has_error(self, errorString): + raise NotImplementedError + + def get_error_codes(self): + raise NotImplementedError + + def get_error_messages(self): + raise NotImplementedError + + +# Originally from txaws.s3.model (under different class names), which was under the MIT / Expat licence. + +class ContainerItem(object): + """ + An item in a listing of cloud objects. + """ + def __init__(self, key, modification_date, etag, size, storage_class, + owner=None): + self.key = key + self.modification_date = modification_date + self.etag = etag + self.size = size + self.storage_class = storage_class + self.owner = owner + + def __repr__(self): + return "" % ({ + "key": self.key, + "modification_date": self.modification_date, + "etag": self.etag, + "size": self.size, + "storage_class": self.storage_class, + "owner": self.owner, + },) + + +class ContainerListing(object): + def __init__(self, name, prefix, marker, max_keys, is_truncated, + contents=None, common_prefixes=None): + self.name = name + self.prefix = prefix + self.marker = marker + self.max_keys = max_keys + self.is_truncated = is_truncated + self.contents = contents + self.common_prefixes = common_prefixes + + def __repr__(self): + return "" % ({ + "name": self.name, + "prefix": self.prefix, + "marker": self.marker, + "max_keys": self.max_keys, + "is_truncated": self.is_truncated, + "contents": self.contents, + "common_prefixes": self.common_prefixes, + }) diff --git a/src/allmydata/storage/backends/cloud/mutable.py b/src/allmydata/storage/backends/cloud/mutable.py new file mode 100644 index 00000000..f8ad0969 --- /dev/null +++ b/src/allmydata/storage/backends/cloud/mutable.py @@ -0,0 +1,470 @@ + +import struct +from collections import deque + +from twisted.internet import defer +from allmydata.util.deferredutil import gatherResults, async_iterate + +from zope.interface import implements + +from allmydata.interfaces import IMutableShare, BadWriteEnablerError +from allmydata.util import idlib, log +from allmydata.util.assertutil import precondition, _assert +from allmydata.util.mathutil import div_ceil +from allmydata.util.hashutil import timing_safe_compare +from allmydata.storage.common import UnknownMutableContainerVersionError, DataTooLargeError +from allmydata.storage.backends.base import testv_compare +from allmydata.mutable.layout import MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE +from allmydata.storage.backends.cloud import cloud_common +from allmydata.storage.backends.cloud.cloud_common import get_chunk_key, get_zero_chunkdata, \ + delete_chunks, BackpressurePipeline, ChunkCache, CloudShareBase, CloudShareReaderMixin + + +# Mutable shares have a different layout to immutable shares. See docs/mutable.rst +# for more details. + +# # offset size name +# 1 0 32 magic verstr "tahoe mutable container v1" plus binary +# 2 32 20 write enabler's nodeid +# 3 52 32 write enabler +# 4 84 8 data size (actual share data present) (a) +# 5 92 8 offset of (8) count of extra leases (after data) +# 6 100 368 four leases, 92 bytes each, unused +# 7 468 (a) data +# 8 ?? 4 count of extra leases +# 9 ?? n*92 extra leases + + +# The struct module doc says that L's are 4 bytes in size, and that Q's are +# 8 bytes in size. Since compatibility depends upon this, double-check it. +assert struct.calcsize(">L") == 4, struct.calcsize(">L") +assert struct.calcsize(">Q") == 8, struct.calcsize(">Q") + + +class Namespace(object): + pass + + +class MutableCloudShare(CloudShareBase, CloudShareReaderMixin): + implements(IMutableShare) + + sharetype = "mutable" + DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s") + EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8 + HEADER = ">32s20s32sQQ" + HEADER_SIZE = struct.calcsize(HEADER) # doesn't include leases + LEASE_SIZE = struct.calcsize(">LL32s32s20s") + assert LEASE_SIZE == 92, LEASE_SIZE + DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE + assert DATA_OFFSET == 468, DATA_OFFSET + NUM_EXTRA_LEASES_SIZE = struct.calcsize(">L") + + MAGIC = MUTABLE_MAGIC + assert len(MAGIC) == 32 + MAX_SIZE = MAX_MUTABLE_SHARE_SIZE + + def __init__(self, container, storage_index, shnum, total_size, first_chunkdata, parent=None): + CloudShareBase.__init__(self, container, storage_index, shnum) + + precondition(isinstance(total_size, (int, long)), total_size=total_size) + precondition(isinstance(first_chunkdata, str), type(first_chunkdata)) + precondition(len(first_chunkdata) <= total_size, "total size is smaller than first chunk", + len_first_chunkdata=len(first_chunkdata), total_size=total_size) + + if len(first_chunkdata) < self.HEADER_SIZE: + msg = "%r had incomplete header (%d bytes)" % (self, len(first_chunkdata)) + raise UnknownMutableContainerVersionError(msg) + + header = first_chunkdata[:self.HEADER_SIZE] + (magic, write_enabler_nodeid, real_write_enabler, + data_length, extra_lease_offset) = struct.unpack(self.HEADER, header) + + if magic != self.MAGIC: + msg = "%r had magic %r but we wanted %r" % (self, magic, self.MAGIC) + raise UnknownMutableContainerVersionError(msg) + + self._write_enabler_nodeid = write_enabler_nodeid + self._real_write_enabler = real_write_enabler + + # We want to support changing PREFERRED_CHUNK_SIZE without breaking compatibility, + # but without "rechunking" any existing shares. Also, existing shares created by + # the pre-chunking code should be handled correctly. + + # If there is more than one chunk, the chunksize must be equal to the size of the + # first chunk, to avoid rechunking. + self._chunksize = len(first_chunkdata) + if self._chunksize == total_size: + # There is only one chunk, so we are at liberty to make the chunksize larger + # than that chunk, but not smaller. + self._chunksize = max(self._chunksize, cloud_common.PREFERRED_CHUNK_SIZE) + + self._zero_chunkdata = get_zero_chunkdata(self._chunksize) + + initial_cachemap = {0: defer.succeed(first_chunkdata)} + self._cache = ChunkCache(container, self._key, self._chunksize, initial_cachemap=initial_cachemap) + #print "CONSTRUCT %s with %r" % (object.__repr__(self), self._cache) + self._data_length = data_length + self._set_total_size(self.DATA_OFFSET + data_length + self.NUM_EXTRA_LEASES_SIZE) + + # The initial total size may not be less than the size of header + data + extra lease count. + # TODO: raise a better exception. + _assert(total_size >= self._total_size, share=repr(self), + total_size=total_size, self_total_size=self._total_size, data_length=data_length) + self._is_oversize = total_size > self._total_size + + self._pipeline = BackpressurePipeline(cloud_common.PIPELINE_DEPTH) + + self.parent = parent # for logging + + def _set_total_size(self, total_size): + self._total_size = total_size + self._nchunks = div_ceil(self._total_size, self._chunksize) + self._cache.set_nchunks(self._nchunks) + + def log(self, *args, **kwargs): + if self.parent: + return self.parent.log(*args, **kwargs) + + @classmethod + def create_empty_share(cls, container, serverid, write_enabler, storage_index=None, shnum=None, parent=None): + # Unlike the disk backend, we don't check that the cloud object does not exist; + # we assume that it does not because create was used, and no-one else should be + # writing to the bucket. + + # There are no extra leases, but for compatibility, the offset they would have + # still needs to be stored in the header. + data_length = 0 + extra_lease_offset = cls.DATA_OFFSET + data_length + header = struct.pack(cls.HEADER, cls.MAGIC, serverid, write_enabler, + data_length, extra_lease_offset) + leases = "\x00"*(cls.LEASE_SIZE * 4) + extra_lease_count = struct.pack(">L", 0) + first_chunkdata = header + leases + extra_lease_count + + share = cls(container, storage_index, shnum, len(first_chunkdata), first_chunkdata, parent=parent) + + d = share._raw_writev(deque([(0, first_chunkdata)]), 0, 0) + d.addCallback(lambda ign: share) + return d + + def _discard(self): + # TODO: discard read cache + pass + + def check_write_enabler(self, write_enabler): + # avoid a timing attack + if not timing_safe_compare(write_enabler, self._real_write_enabler): + # accomodate share migration by reporting the nodeid used for the + # old write enabler. + self.log(format="bad write enabler on SI %(si)s," + " recorded by nodeid %(nodeid)s", + facility="tahoe.storage", + level=log.WEIRD, umid="DF2fCR", + si=self.get_storage_index_string(), + nodeid=idlib.nodeid_b2a(self._write_enabler_nodeid)) + msg = "The write enabler was recorded by nodeid '%s'." % \ + (idlib.nodeid_b2a(self._write_enabler_nodeid),) + raise BadWriteEnablerError(msg) + return defer.succeed(None) + + def check_testv(self, testv): + def _test( (offset, length, operator, specimen) ): + d = self.read_share_data(offset, length) + d.addCallback(lambda data: testv_compare(data, operator, specimen)) + return d + return async_iterate(_test, sorted(testv)) + + def writev(self, datav, new_length): + precondition(new_length is None or new_length >= 0, new_length=new_length) + + raw_datav, preserved_size, new_data_length = self._prepare_writev(datav, new_length) + return self._raw_writev(raw_datav, preserved_size, new_data_length) + + def _prepare_writev(self, datav, new_length): + # Translate the client's write vector and 'new_length' into a "raw" write vector + # and new total size. This has no side effects to make it easier to test. + + preserved_size = self.DATA_OFFSET + self._data_length + + # chunk containing the byte after the current end-of-data + endofdata_chunknum = preserved_size / self._chunksize + + # Whether we need to add a dummy write to zero-extend the end-of-data chunk. + ns = Namespace() + ns.need_zeroextend_write = preserved_size % self._chunksize != 0 + + raw_datav = deque() + def _add_write(seekpos, data): + #print "seekpos =", seekpos + raw_datav.append( (seekpos, data) ) + + lastpos = seekpos + len(data) - 1 + start_chunknum = seekpos / self._chunksize + last_chunknum = lastpos / self._chunksize + if start_chunknum <= endofdata_chunknum and endofdata_chunknum <= last_chunknum: + # If any of the client's writes overlaps the end-of-data chunk, we should not + # add the zero-extending dummy write. + ns.need_zeroextend_write = False + + #print "need_zeroextend_write =", ns.need_zeroextend_write + new_data_length = self._data_length + + # Validate the write vector and translate its offsets into seek positions from + # the start of the share. + for (offset, data) in datav: + length = len(data) + precondition(offset >= 0, offset=offset) + if offset + length > self.MAX_SIZE: + raise DataTooLargeError() + + if new_length is not None and new_length < offset + length: + length = max(0, new_length - offset) + data = data[: length] + + new_data_length = max(new_data_length, offset + length) + if length > 0: + _add_write(self.DATA_OFFSET + offset, data) + + # new_length can only be used to truncate, not extend. + if new_length is not None: + new_data_length = min(new_length, new_data_length) + + # If the data length has changed, include additional raw writes to the data length + # field in the header, and to the extra lease count field after the data. + # + # Also do this if there were extra leases (e.g. if this was a share copied from a + # disk backend), so that they will be deleted. If the size hasn't changed and there + # are no extra leases, we don't bother to ensure that the extra lease count field is + # zero; it is ignored anyway. + if new_data_length != self._data_length or self._is_oversize: + extra_lease_offset = self.DATA_OFFSET + new_data_length + + # Don't preserve old data past the new end-of-data. + preserved_size = min(preserved_size, extra_lease_offset) + + # These are disjoint with any ranges already in raw_datav. + _add_write(self.DATA_LENGTH_OFFSET, struct.pack(">Q", new_data_length)) + _add_write(extra_lease_offset, struct.pack(">L", 0)) + + #print "need_zeroextend_write =", ns.need_zeroextend_write + # If the data length is being increased and there are no other writes to the + # current end-of-data chunk (including the two we just added), add a dummy write + # of one zero byte at the end of that chunk. This will cause that chunk to be + # zero-extended to the full chunk size, which would not otherwise happen. + if new_data_length > self._data_length and ns.need_zeroextend_write: + _add_write((endofdata_chunknum + 1)*self._chunksize - 1, "\x00") + + # Sorting the writes simplifies things (and we need all the simplification we can get :-) + raw_datav = deque(sorted(raw_datav, key=lambda (offset, data): offset)) + + # Complain if write vector elements overlap, that's too hard in general. + (last_seekpos, last_data) = (0, "") + have_duplicates = False + for (i, (seekpos, data)) in enumerate(raw_datav): + # The MDMF publisher in 1.9.0 and 1.9.1 produces duplicated writes to the MDMF header. + # If this is an exactly duplicated write, skip it. + if seekpos == last_seekpos and data == last_data: + raw_datav[i] = None + have_duplicates = True + else: + last_endpos = last_seekpos + len(last_data) + _assert(seekpos >= last_endpos, "overlapping write vector elements", + seekpos=seekpos, last_seekpos=last_seekpos, last_endpos=last_endpos) + (last_seekpos, last_data) = (seekpos, data) + + if have_duplicates: + raw_datav.remove(None) + + # Return a vector of writes to ranges in the share, the size of previous contents to + # be preserved, and the final data length. + return (raw_datav, preserved_size, new_data_length) + + def _raw_writev(self, raw_datav, preserved_size, new_data_length): + #print "%r._raw_writev(%r, %r, %r)" % (self, raw_datav, preserved_size, new_data_length) + + old_nchunks = self._nchunks + + # The _total_size and _nchunks attributes are updated as each write is applied. + self._set_total_size(preserved_size) + + final_size = self.DATA_OFFSET + new_data_length + self.NUM_EXTRA_LEASES_SIZE + + d = self._raw_write_share_data(None, raw_datav, final_size) + + def _resize(ign): + self._data_length = new_data_length + self._set_total_size(final_size) + + if self._nchunks < old_nchunks or self._is_oversize: + self._is_oversize = False + #print "DELETING chunks from", self._nchunks + return delete_chunks(self._container, self._key, from_chunknum=self._nchunks) + d.addCallback(_resize) + + d.addCallback(lambda ign: self._pipeline.flush()) + return d + + def _raw_write_share_data(self, ign, raw_datav, final_size): + """ + raw_datav: (deque of (integer, str)) the remaining raw write vector + final_size: (integer) the size the file will be after all writes in the writev + """ + #print "%r._raw_write_share_data(%r, %r)" % (self, (seekpos, data), final_size) + + precondition(final_size >= 0, final_size=final_size) + + d = defer.succeed(None) + if not raw_datav: + return d + + (seekpos, data) = raw_datav.popleft() + _assert(seekpos >= 0 and len(data) > 0, seekpos=seekpos, len_data=len(data), + len_raw_datav=len(raw_datav), final_size=final_size) + + # We *may* need to read the start chunk and/or last chunk before rewriting them. + # (If they are the same chunk, that's fine, the cache will ensure we don't + # read the cloud object twice.) + lastpos = seekpos + len(data) - 1 + _assert(lastpos > 0, seekpos=seekpos, len_data=len(data), lastpos=lastpos) + start_chunknum = seekpos / self._chunksize + start_chunkpos = start_chunknum*self._chunksize + start_offset = seekpos % self._chunksize + last_chunknum = lastpos / self._chunksize + last_chunkpos = last_chunknum*self._chunksize + last_offset = lastpos % self._chunksize + _assert(start_chunknum <= last_chunknum, start_chunknum=start_chunknum, last_chunknum=last_chunknum) + + #print "lastpos =", lastpos + #print "len(data) =", len(data) + #print "start_chunknum =", start_chunknum + #print "start_offset =", start_offset + #print "last_chunknum =", last_chunknum + #print "last_offset =", last_offset + #print "_total_size =", self._total_size + #print "_chunksize =", self._chunksize + #print "_nchunks =", self._nchunks + + start_chunkdata_d = defer.Deferred() + last_chunkdata_d = defer.Deferred() + + # Is the first byte of the start chunk preserved? + if start_chunknum*self._chunksize < self._total_size and start_offset > 0: + # Yes, so we need to read it first. + d.addCallback(lambda ign: self._cache.get(start_chunknum, start_chunkdata_d)) + else: + start_chunkdata_d.callback("") + + # Is any byte of the last chunk preserved? + if last_chunkpos < self._total_size and lastpos < min(self._total_size, last_chunkpos + self._chunksize) - 1: + # Yes, so we need to read it first. + d.addCallback(lambda ign: self._cache.get(last_chunknum, last_chunkdata_d)) + else: + last_chunkdata_d.callback("") + + d.addCallback(lambda ign: gatherResults( (start_chunkdata_d, last_chunkdata_d) )) + def _got( (start_chunkdata, last_chunkdata) ): + #print "start_chunkdata =", len(start_chunkdata), repr(start_chunkdata) + #print "last_chunkdata =", len(last_chunkdata), repr(last_chunkdata) + d2 = defer.succeed(None) + + # Zero any chunks from self._nchunks (i.e. after the last currently valid chunk) + # to before the start chunk of the write. + for zero_chunknum in xrange(self._nchunks, start_chunknum): + d2.addCallback(self._pipeline_store_chunk, zero_chunknum, self._zero_chunkdata) + + # start_chunkdata and last_chunkdata may need to be truncated and/or zero-extended. + start_preserved = max(0, min(len(start_chunkdata), self._total_size - start_chunkpos, start_offset)) + last_preserved = max(0, min(len(last_chunkdata), self._total_size - last_chunkpos)) + + start_chunkdata = (start_chunkdata[: start_preserved] + + self._zero_chunkdata[: max(0, start_offset - start_preserved)] + + data[: self._chunksize - start_offset]) + + # last_slice_len = len(last_chunkdata[last_offset + 1 : last_preserved]) + last_slice_len = max(0, last_preserved - (last_offset + 1)) + last_chunksize = min(final_size - last_chunkpos, self._chunksize) + last_chunkdata = (last_chunkdata[last_offset + 1 : last_preserved] + + self._zero_chunkdata[: max(0, last_chunksize - (last_offset + 1) - last_slice_len)]) + + # This loop eliminates redundant reads and writes, by merging the contents of writes + # after this one into last_chunkdata as far as possible. It ensures that we never need + # to read a chunk twice in the same writev (which is needed for correctness; see below). + while raw_datav: + # Does the next write start in the same chunk as this write ends (last_chunknum)? + (next_seekpos, next_chunkdata) = raw_datav[0] + next_start_chunknum = next_seekpos / self._chunksize + next_start_offset = next_seekpos % self._chunksize + next_lastpos = next_seekpos + len(next_chunkdata) - 1 + + if next_start_chunknum != last_chunknum: + break + + _assert(next_start_offset > last_offset, + next_start_offset=next_start_offset, last_offset=last_offset) + + # Cut next_chunkdata at the end of next_start_chunknum. + next_cutpos = (next_start_chunknum + 1)*self._chunksize + last_chunkdata = (last_chunkdata[: next_start_offset - (last_offset + 1)] + + next_chunkdata[: next_cutpos - next_seekpos] + + last_chunkdata[next_lastpos - lastpos :]) + + # Does the next write extend beyond that chunk? + if next_lastpos >= next_cutpos: + # The part after the cut will be processed in the next call to _raw_write_share_data. + raw_datav[0] = (next_cutpos, next_chunkdata[next_cutpos - next_seekpos :]) + break + else: + # Discard the write that has already been processed. + raw_datav.popleft() + + # start_chunknum and last_chunknum are going to be written, so need to be flushed + # from the read cache in case the new contents are needed by a subsequent readv + # or writev. (Due to the 'while raw_datav' loop above, we won't need to read them + # again in *this* writev. That property is needed for correctness because we don't + # flush the write pipeline until the end of the writev.) + + d2.addCallback(lambda ign: self._cache.flush_chunk(start_chunkdata)) + d2.addCallback(lambda ign: self._cache.flush_chunk(last_chunkdata)) + + # Now do the current write. + if last_chunknum == start_chunknum: + d2.addCallback(self._pipeline_store_chunk, start_chunknum, + start_chunkdata + last_chunkdata) + else: + d2.addCallback(self._pipeline_store_chunk, start_chunknum, + start_chunkdata) + + for middle_chunknum in xrange(start_chunknum + 1, last_chunknum): + d2.addCallback(self._pipeline_store_chunk, middle_chunknum, + data[middle_chunknum*self._chunksize - seekpos + : (middle_chunknum + 1)*self._chunksize - seekpos]) + + d2.addCallback(self._pipeline_store_chunk, last_chunknum, + data[last_chunkpos - seekpos :] + last_chunkdata) + return d2 + d.addCallback(_got) + d.addCallback(self._raw_write_share_data, raw_datav, final_size) # continue the iteration + return d + + def _pipeline_store_chunk(self, ign, chunknum, chunkdata): + precondition(len(chunkdata) <= self._chunksize, len_chunkdata=len(chunkdata), chunksize=self._chunksize) + + chunkkey = get_chunk_key(self._key, chunknum) + #print "STORING", chunkkey, len(chunkdata), repr(chunkdata) + + endpos = chunknum*self._chunksize + len(chunkdata) + if endpos > self._total_size: + self._set_total_size(endpos) + + # We'd like to stream writes, but the supported service containers + # (and the IContainer interface) don't support that yet. For txaws, see + # https://bugs.launchpad.net/txaws/+bug/767205 and + # https://bugs.launchpad.net/txaws/+bug/783801 + return self._pipeline.add(1, self._container.put_object, chunkkey, chunkdata) + + def close(self): + # FIXME: 'close' doesn't exist in IMutableShare + self._discard() + d = self._pipeline.close() + d.addCallback(lambda ign: self._cache.close()) + return d \ No newline at end of file diff --git a/src/allmydata/storage/backends/cloud/s3/s3_container.py b/src/allmydata/storage/backends/cloud/s3/s3_container.py new file mode 100644 index 00000000..d1d6712a --- /dev/null +++ b/src/allmydata/storage/backends/cloud/s3/s3_container.py @@ -0,0 +1,101 @@ + +from zope.interface import implements + +from allmydata.node import InvalidValueError +from allmydata.storage.backends.cloud.cloud_common import IContainer, \ + ContainerRetryMixin, ContainerListMixin + + +def configure_s3_container(storedir, config): + from allmydata.storage.backends.cloud.s3.s3_container import S3Container + + accesskeyid = config.get_config("storage", "s3.access_key_id") + secretkey = config.get_or_create_private_config("s3secret") + usertoken = config.get_optional_private_config("s3usertoken") + producttoken = config.get_optional_private_config("s3producttoken") + if producttoken and not usertoken: + raise InvalidValueError("If private/s3producttoken is present, private/s3usertoken must also be present.") + url = config.get_config("storage", "s3.url", "http://s3.amazonaws.com") + container_name = config.get_config("storage", "s3.bucket") + + return S3Container(accesskeyid, secretkey, url, container_name, usertoken, producttoken) + + +class S3Container(ContainerRetryMixin, ContainerListMixin): + implements(IContainer) + """ + I represent a real S3 container (bucket), accessed using the txaws library. + """ + + def __init__(self, access_key, secret_key, url, container_name, usertoken=None, producttoken=None): + # We only depend on txaws when this class is actually instantiated. + from txaws.credentials import AWSCredentials + from txaws.service import AWSServiceEndpoint + from txaws.s3.client import S3Client, Query + from txaws.s3.exception import S3Error + + creds = AWSCredentials(access_key=access_key, secret_key=secret_key) + endpoint = AWSServiceEndpoint(uri=url) + + query_factory = None + if usertoken is not None: + def make_query(*args, **kwargs): + amz_headers = kwargs.get("amz_headers", {}) + if producttoken is not None: + amz_headers["security-token"] = (usertoken, producttoken) + else: + amz_headers["security-token"] = usertoken + kwargs["amz_headers"] = amz_headers + + return Query(*args, **kwargs) + query_factory = make_query + + self.client = S3Client(creds=creds, endpoint=endpoint, query_factory=query_factory) + self.container_name = container_name + self.ServiceError = S3Error + + def __repr__(self): + return ("<%s %r>" % (self.__class__.__name__, self.container_name,)) + + def create(self): + return self._do_request('create bucket', self.client.create, self.container_name) + + def delete(self): + return self._do_request('delete bucket', self.client.delete, self.container_name) + + def list_some_objects(self, **kwargs): + return self._do_request('list objects', self.client.get_bucket, self.container_name, **kwargs) + + def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}): + return self._do_request('PUT object', self.client.put_object, self.container_name, + object_name, data, content_type, metadata) + + def get_object(self, object_name): + return self._do_request('GET object', self.client.get_object, self.container_name, object_name) + + def head_object(self, object_name): + return self._do_request('HEAD object', self.client.head_object, self.container_name, object_name) + + def delete_object(self, object_name): + return self._do_request('DELETE object', self.client.delete_object, self.container_name, object_name) + + def put_policy(self, policy): + """ + Set access control policy on a bucket. + """ + query = self.client.query_factory( + action='PUT', creds=self.client.creds, endpoint=self.client.endpoint, + bucket=self.container_name, object_name='?policy', data=policy) + return self._do_request('PUT policy', query.submit) + + def get_policy(self): + query = self.client.query_factory( + action='GET', creds=self.client.creds, endpoint=self.client.endpoint, + bucket=self.container_name, object_name='?policy') + return self._do_request('GET policy', query.submit) + + def delete_policy(self): + query = self.client.query_factory( + action='DELETE', creds=self.client.creds, endpoint=self.client.endpoint, + bucket=self.container_name, object_name='?policy') + return self._do_request('DELETE policy', query.submit) diff --git a/src/allmydata/storage/backends/disk/disk_backend.py b/src/allmydata/storage/backends/disk/disk_backend.py new file mode 100644 index 00000000..2d24f694 --- /dev/null +++ b/src/allmydata/storage/backends/disk/disk_backend.py @@ -0,0 +1,184 @@ + +import struct, os.path + +from twisted.internet import defer + +from zope.interface import implements +from allmydata.interfaces import IStorageBackend, IShareSet +from allmydata.util import fileutil, log +from allmydata.storage.common import si_b2a, si_a2b, NUM_RE, \ + UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError +from allmydata.storage.bucket import BucketWriter +from allmydata.storage.backends.base import Backend, ShareSet +from allmydata.storage.backends.disk.immutable import load_immutable_disk_share, create_immutable_disk_share +from allmydata.storage.backends.disk.mutable import load_mutable_disk_share, create_mutable_disk_share +from allmydata.mutable.layout import MUTABLE_MAGIC + + +# storage/ +# storage/shares/incoming +# incoming/ holds temp dirs named $PREFIX/$STORAGEINDEX/$SHNUM which will +# be moved to storage/shares/$PREFIX/$STORAGEINDEX/$SHNUM upon success +# storage/shares/$PREFIX/$STORAGEINDEX +# storage/shares/$PREFIX/$STORAGEINDEX/$SHNUM + +# where "$PREFIX" denotes the first 10 bits worth of $STORAGEINDEX (that's 2 +# base-32 chars). + + +def si_si2dir(startdir, storage_index): + sia = si_b2a(storage_index) + return os.path.join(startdir, sia[:2], sia) + +def get_disk_share(home, storage_index=None, shnum=None): + f = open(home, 'rb') + try: + prefix = f.read(len(MUTABLE_MAGIC)) + finally: + f.close() + + if prefix == MUTABLE_MAGIC: + return load_mutable_disk_share(home, storage_index, shnum) + else: + # assume it's immutable + return load_immutable_disk_share(home, storage_index, shnum) + + +def configure_disk_backend(storedir, config): + readonly = config.get_config("storage", "readonly", False, boolean=True) + reserved_space = config.get_config_size("storage", "reserved_space", "0") + + return DiskBackend(storedir, readonly, reserved_space) + + +class DiskBackend(Backend): + implements(IStorageBackend) + + def __init__(self, storedir, readonly=False, reserved_space=0): + Backend.__init__(self) + self._storedir = storedir + self._readonly = readonly + self._reserved_space = int(reserved_space) + self._sharedir = os.path.join(self._storedir, 'shares') + fileutil.make_dirs(self._sharedir) + self._incomingdir = os.path.join(self._sharedir, 'incoming') + self._clean_incomplete() + if self._reserved_space and (self.get_available_space() is None): + log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored", + umid="0wZ27w", level=log.UNUSUAL) + + def _clean_incomplete(self): + fileutil.rm_dir(self._incomingdir) + fileutil.make_dirs(self._incomingdir) + + def get_sharesets_for_prefix(self, prefix): + prefixdir = os.path.join(self._sharedir, prefix) + sharesets = [self.get_shareset(si_a2b(si_s)) + for si_s in sorted(fileutil.listdir(prefixdir))] + return defer.succeed(sharesets) + + def get_shareset(self, storage_index): + sharehomedir = si_si2dir(self._sharedir, storage_index) + incominghomedir = si_si2dir(self._incomingdir, storage_index) + return DiskShareSet(storage_index, sharehomedir, incominghomedir) + + def fill_in_space_stats(self, stats): + stats['storage_server.reserved_space'] = self._reserved_space + try: + disk = fileutil.get_disk_stats(self._sharedir, self._reserved_space) + writeable = disk['avail'] > 0 + + # spacetime predictors should use disk_avail / (d(disk_used)/dt) + stats['storage_server.disk_total'] = disk['total'] + stats['storage_server.disk_used'] = disk['used'] + stats['storage_server.disk_free_for_root'] = disk['free_for_root'] + stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot'] + stats['storage_server.disk_avail'] = disk['avail'] + except AttributeError: + writeable = True + except EnvironmentError: + log.msg("OS call to get disk statistics failed", level=log.UNUSUAL) + writeable = False + + if self._readonly: + stats['storage_server.disk_avail'] = 0 + writeable = False + + stats['storage_server.accepting_immutable_shares'] = int(writeable) + + def get_available_space(self): + if self._readonly: + return 0 + try: + return fileutil.get_available_space(self._sharedir, self._reserved_space) + except EnvironmentError: + return 0 + + def must_use_tubid_as_permutation_seed(self): + # A disk backend with existing shares must assume that it was around before #466, + # so must use its TubID as a permutation-seed. + return bool(set(fileutil.listdir(self._sharedir)) - set(["incoming"])) + + +class DiskShareSet(ShareSet): + implements(IShareSet) + + def __init__(self, storage_index, sharehomedir, incominghomedir=None): + ShareSet.__init__(self, storage_index) + self._sharehomedir = sharehomedir + self._incominghomedir = incominghomedir + + def get_overhead(self): + return (fileutil.get_used_space(self._sharehomedir) + + fileutil.get_used_space(self._incominghomedir)) + + def get_shares(self): + si = self.get_storage_index() + shares = {} + corrupted = set() + for shnumstr in fileutil.listdir(self._sharehomedir, filter=NUM_RE): + shnum = int(shnumstr) + sharefile = os.path.join(self._sharehomedir, shnumstr) + try: + shares[shnum] = get_disk_share(sharefile, si, shnum) + except (UnknownMutableContainerVersionError, + UnknownImmutableContainerVersionError, + struct.error): + corrupted.add(shnum) + + valid = [shares[shnum] for shnum in sorted(shares.keys())] + return defer.succeed( (valid, corrupted) ) + + def get_share(self, shnum): + return get_disk_share(os.path.join(self._sharehomedir, str(shnum)), + self.get_storage_index(), shnum) + + def delete_share(self, shnum): + fileutil.remove(os.path.join(self._sharehomedir, str(shnum))) + return defer.succeed(None) + + def has_incoming(self, shnum): + if self._incominghomedir is None: + return False + return os.path.exists(os.path.join(self._incominghomedir, str(shnum))) + + def make_bucket_writer(self, account, shnum, allocated_data_length, canary): + finalhome = os.path.join(self._sharehomedir, str(shnum)) + incominghome = os.path.join(self._incominghomedir, str(shnum)) + immsh = create_immutable_disk_share(incominghome, finalhome, allocated_data_length, + self.get_storage_index(), shnum) + bw = BucketWriter(account, immsh, canary) + return bw + + def _create_mutable_share(self, account, shnum, write_enabler): + fileutil.make_dirs(self._sharehomedir) + sharehome = os.path.join(self._sharehomedir, str(shnum)) + serverid = account.server.get_serverid() + return create_mutable_disk_share(sharehome, serverid, write_enabler, + self.get_storage_index(), shnum, parent=account.server) + + def _clean_up_after_unlink(self): + fileutil.rmdir_if_empty(self._sharehomedir) + + def _get_sharedir(self): + return self._sharehomedir diff --git a/src/allmydata/storage/backends/null/null_backend.py b/src/allmydata/storage/backends/null/null_backend.py new file mode 100644 index 00000000..dbd5c9e7 --- /dev/null +++ b/src/allmydata/storage/backends/null/null_backend.py @@ -0,0 +1,193 @@ + +from twisted.internet import defer + +from zope.interface import implements +from allmydata.interfaces import IStorageBackend, IShareSet, IShareBase, \ + IShareForReading, IShareForWriting, IMutableShare + +from allmydata.util.assertutil import precondition +from allmydata.storage.backends.base import Backend, ShareSet, empty_check_testv +from allmydata.storage.bucket import BucketWriter +from allmydata.storage.common import si_b2a + + +def configure_null_backend(storedir, config): + return NullBackend() + + +class NullBackend(Backend): + implements(IStorageBackend) + """ + I am a test backend that records (in memory) which shares exist, but not their contents, leases, + or write-enablers. + """ + + def __init__(self): + Backend.__init__(self) + # mapping from storage_index to NullShareSet + self._sharesets = {} + + def get_available_space(self): + return None + + def get_sharesets_for_prefix(self, prefix): + sharesets = [] + for (si, shareset) in self._sharesets.iteritems(): + if si_b2a(si).startswith(prefix): + sharesets.append(shareset) + + def _by_base32si(b): + return b.get_storage_index_string() + sharesets.sort(key=_by_base32si) + return defer.succeed(sharesets) + + def get_shareset(self, storage_index): + shareset = self._sharesets.get(storage_index, None) + if shareset is None: + shareset = NullShareSet(storage_index) + self._sharesets[storage_index] = shareset + return shareset + + def fill_in_space_stats(self, stats): + pass + + +class NullShareSet(ShareSet): + implements(IShareSet) + + def __init__(self, storage_index): + self.storage_index = storage_index + self._incoming_shnums = set() + self._immutable_shnums = set() + self._mutable_shnums = set() + + def close_shnum(self, shnum): + self._incoming_shnums.remove(shnum) + self._immutable_shnums.add(shnum) + return defer.succeed(None) + + def get_overhead(self): + return 0 + + def get_shares(self): + shares = {} + for shnum in self._immutable_shnums: + shares[shnum] = ImmutableNullShare(self, shnum) + for shnum in self._mutable_shnums: + shares[shnum] = MutableNullShare(self, shnum) + # This backend never has any corrupt shares. + return defer.succeed( ([shares[shnum] for shnum in sorted(shares.keys())], set()) ) + + def get_share(self, shnum): + if shnum in self._immutable_shnums: + return defer.succeed(ImmutableNullShare(self, shnum)) + elif shnum in self._mutable_shnums: + return defer.succeed(MutableNullShare(self, shnum)) + else: + def _not_found(): raise IndexError("no such share %d" % (shnum,)) + return defer.execute(_not_found) + + def delete_share(self, shnum, include_incoming=False): + if include_incoming and (shnum in self._incoming_shnums): + self._incoming_shnums.remove(shnum) + if shnum in self._immutable_shnums: + self._immutable_shnums.remove(shnum) + if shnum in self._mutable_shnums: + self._mutable_shnums.remove(shnum) + return defer.succeed(None) + + def has_incoming(self, shnum): + return shnum in self._incoming_shnums + + def get_storage_index(self): + return self.storage_index + + def get_storage_index_string(self): + return si_b2a(self.storage_index) + + def make_bucket_writer(self, account, shnum, allocated_data_length, canary): + self._incoming_shnums.add(shnum) + immutableshare = ImmutableNullShare(self, shnum) + bw = BucketWriter(account, immutableshare, canary) + bw.throw_out_all_data = True + return bw + + +class NullShareBase(object): + implements(IShareBase) + + def __init__(self, shareset, shnum): + self.shareset = shareset + self.shnum = shnum + + def get_storage_index(self): + return self.shareset.get_storage_index() + + def get_storage_index_string(self): + return self.shareset.get_storage_index_string() + + def get_shnum(self): + return self.shnum + + def get_data_length(self): + return 0 + + def get_size(self): + return 0 + + def get_used_space(self): + return 0 + + def unlink(self): + return self.shareset.delete_share(self.shnum, include_incoming=True) + + def readv(self, readv): + datav = [] + for (offset, length) in readv: + datav.append("") + return defer.succeed(datav) + + def get_leases(self): + pass + + def add_lease(self, lease): + pass + + def renew_lease(self, renew_secret, new_expire_time): + raise IndexError("unable to renew non-existent lease") + + def add_or_renew_lease(self, lease_info): + pass + + +class ImmutableNullShare(NullShareBase): + implements(IShareForReading, IShareForWriting) + sharetype = "immutable" + + def read_share_data(self, offset, length): + precondition(offset >= 0) + return defer.succeed("") + + def get_allocated_data_length(self): + return 0 + + def write_share_data(self, offset, data): + return defer.succeed(None) + + def close(self): + return self.shareset.close_shnum(self.shnum) + + +class MutableNullShare(NullShareBase): + implements(IMutableShare) + sharetype = "mutable" + + def check_write_enabler(self, write_enabler): + # Null backend doesn't check write enablers. + return defer.succeed(None) + + def check_testv(self, testv): + return defer.succeed(empty_check_testv(testv)) + + def writev(self, datav, new_length): + return defer.succeed(None)