From: David-Sarah Hopwood Date: Thu, 22 Nov 2012 05:33:35 +0000 (+0000) Subject: Move code around and add new directories for cloud backend merge. X-Git-Url: https://git.rkrishnan.org/listings/cyclelanguage?a=commitdiff_plain;h=dc2542fe050f97a6e84802d30cbdb7357fd01e2e;p=tahoe-lafs%2Ftahoe-lafs.git Move code around and add new directories for cloud backend merge. Signed-off-by: David-Sarah Hopwood --- diff --git a/setup.py b/setup.py index 035b0c9c..456c7fa2 100644 --- a/setup.py +++ b/setup.py @@ -439,6 +439,11 @@ setup(name=APPNAME, 'allmydata.mutable', 'allmydata.scripts', 'allmydata.storage', + 'allmydata.storage.backends', + 'allmydata.storage.backends.cloud', + 'allmydata.storage.backends.cloud.s3', + 'allmydata.storage.backends.disk', + 'allmydata.storage.backends.null', 'allmydata.test', 'allmydata.util', 'allmydata.web', diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index c3a7b5c3..b01ca32a 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -31,7 +31,7 @@ verify-cap for the file that uses the share. self['filename'] = argv_to_abspath(filename) def dump_share(options): - from allmydata.storage.mutable import MutableShareFile + from allmydata.storage.backends.disk.mutable import MutableShareFile from allmydata.util.encodingutil import quote_output out = options.stdout @@ -48,7 +48,7 @@ def dump_share(options): return dump_immutable_share(options) def dump_immutable_share(options): - from allmydata.storage.immutable import ShareFile + from allmydata.storage.backends.disk.immutable import ShareFile out = options.stdout f = ShareFile(options['filename']) @@ -149,7 +149,7 @@ def format_expiration_time(expiration_time): def dump_mutable_share(options): - from allmydata.storage.mutable import MutableShareFile + from allmydata.storage.backends.disk.mutable import MutableShareFile from allmydata.util import base32, idlib out = options.stdout m = MutableShareFile(options['filename']) @@ -619,8 +619,8 @@ def call(c, *args, **kwargs): def describe_share(abs_sharefile, si_s, shnum_s, now, out): from allmydata import uri - from allmydata.storage.mutable import MutableShareFile - from allmydata.storage.immutable import ShareFile + from allmydata.storage.backends.disk.mutable import MutableShareFile + from allmydata.storage.backends.disk.immutable import ShareFile from allmydata.mutable.layout import unpack_share from allmydata.mutable.common import NeedMoreDataError from allmydata.immutable.layout import ReadBucketProxy @@ -810,8 +810,8 @@ Obviously, this command should not be used in normal operation. def corrupt_share(options): import random - from allmydata.storage.mutable import MutableShareFile - from allmydata.storage.immutable import ShareFile + from allmydata.storage.backends.disk.mutable import MutableShareFile + from allmydata.storage.backends.disk.immutable import ShareFile from allmydata.mutable.layout import unpack_header from allmydata.immutable.layout import ReadBucketProxy out = options.stdout diff --git a/src/allmydata/storage/backends/__init__.py b/src/allmydata/storage/backends/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/allmydata/storage/backends/cloud/__init__.py b/src/allmydata/storage/backends/cloud/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/allmydata/storage/backends/cloud/s3/__init__.py b/src/allmydata/storage/backends/cloud/s3/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/allmydata/storage/backends/disk/__init__.py b/src/allmydata/storage/backends/disk/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/allmydata/storage/backends/disk/immutable.py b/src/allmydata/storage/backends/disk/immutable.py new file mode 100644 index 00000000..780b04a4 --- /dev/null +++ b/src/allmydata/storage/backends/disk/immutable.py @@ -0,0 +1,250 @@ + +import os, struct, time + +from foolscap.api import Referenceable + +from zope.interface import implements +from allmydata.interfaces import RIBucketWriter, RIBucketReader +from allmydata.util import base32, fileutil, log +from allmydata.util.fileutil import get_used_space +from allmydata.util.assertutil import precondition +from allmydata.storage.common import UnknownImmutableContainerVersionError, \ + DataTooLargeError +from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE + + +# Each share file (in storage/shares/$SI/$SHNUM) contains share data that +# can be accessed by RIBucketWriter.write and RIBucketReader.read . + +# The share file has the following layout: +# 0x00: share file version number, four bytes, current version is 1 +# 0x04: share data length, four bytes big-endian # Footnote 1 +# 0x08: number of leases, four bytes big-endian = N # Footnote 2 +# 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy) +# filesize - 72*N: leases (ignored). Each lease is 72 bytes. + +# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers. + +# Footnote 2: as of Tahoe v1.11.0 this field is not used by storage servers. +# New shares will have a 0 here. Old shares will have whatever value was left +# over when the server was upgraded. All lease information is now kept in the +# leasedb. + + +class ShareFile: + sharetype = "immutable" + LEASE_SIZE = struct.calcsize(">L32s32sL") + HEADER = ">LLL" + HEADER_SIZE = struct.calcsize(HEADER) + DATA_OFFSET = HEADER_SIZE + + def __init__(self, filename, max_size=None, create=False): + """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """ + precondition((max_size is not None) or (not create), max_size, create) + self.home = filename + self._max_size = max_size + if create: + # touch the file, so later callers will see that we're working on + # it. Also construct the metadata. + assert not os.path.exists(self.home) + fileutil.make_dirs(os.path.dirname(self.home)) + f = open(self.home, 'wb') + # The second field -- the four-byte share data length -- is no + # longer used as of Tahoe v1.3.0, but we continue to write it in + # there in case someone downgrades a storage server from >= + # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one + # server to another, etc. We do saturation -- a share data length + # larger than 2**32-1 (what can fit into the field) is marked as + # the largest length that can fit into the field. That way, even + # if this does happen, the old < v1.3.0 server will still allow + # clients to read the first part of the share. + f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0)) + f.close() + self._data_length = max_size + else: + f = open(self.home, 'rb') + try: + (version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE)) + finally: + f.close() + if version != 1: + msg = "sharefile %s had version %d but we wanted 1" % \ + (filename, version) + raise UnknownImmutableContainerVersionError(msg) + + filesize = os.stat(self.home).st_size + self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE) + + # TODO: raise a better exception. + assert self._data_length >= 0, self._data_length + + def get_used_space(self): + return get_used_space(self.home) + + def unlink(self): + os.unlink(self.home) + + def get_size(self): + return os.stat(self.home).st_size + + def read_share_data(self, offset, length): + precondition(offset >= 0) + + # Reads beyond the end of the data are truncated. Reads that start + # beyond the end of the data return an empty string. + seekpos = self.DATA_OFFSET + offset + actuallength = max(0, min(length, self._data_length - offset)) + if actuallength == 0: + return "" + f = open(self.home, 'rb') + try: + f.seek(seekpos) + return f.read(actuallength) + finally: + f.close() + + def write_share_data(self, offset, data): + length = len(data) + precondition(offset >= 0, offset) + if self._max_size is not None and offset+length > self._max_size: + raise DataTooLargeError(self._max_size, offset, length) + f = open(self.home, 'rb+') + try: + real_offset = self.DATA_OFFSET + offset + f.seek(real_offset) + assert f.tell() == real_offset + f.write(data) + finally: + f.close() + + +class BucketWriter(Referenceable): + implements(RIBucketWriter) + + def __init__(self, ss, account, storage_index, shnum, + incominghome, finalhome, max_size, canary): + self.ss = ss + self.incominghome = incominghome + self.finalhome = finalhome + self._max_size = max_size # don't allow the client to write more than this + self._account = account + self._storage_index = storage_index + self._shnum = shnum + self._canary = canary + self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) + self.closed = False + self.throw_out_all_data = False + self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) + self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE) + + def allocated_size(self): + return self._max_size + + def remote_write(self, offset, data): + start = time.time() + precondition(not self.closed) + if self.throw_out_all_data: + return + self._sharefile.write_share_data(offset, data) + self.ss.add_latency("write", time.time() - start) + self.ss.count("write") + + def remote_close(self): + precondition(not self.closed) + start = time.time() + + fileutil.make_dirs(os.path.dirname(self.finalhome)) + fileutil.rename(self.incominghome, self.finalhome) + try: + # self.incominghome is like storage/shares/incoming/ab/abcde/4 . + # We try to delete the parent (.../ab/abcde) to avoid leaving + # these directories lying around forever, but the delete might + # fail if we're working on another share for the same storage + # index (like ab/abcde/5). The alternative approach would be to + # use a hierarchy of objects (PrefixHolder, BucketHolder, + # ShareWriter), each of which is responsible for a single + # directory on disk, and have them use reference counting of + # their children to know when they should do the rmdir. This + # approach is simpler, but relies on os.rmdir refusing to delete + # a non-empty directory. Do *not* use fileutil.rm_dir() here! + os.rmdir(os.path.dirname(self.incominghome)) + # we also delete the grandparent (prefix) directory, .../ab , + # again to avoid leaving directories lying around. This might + # fail if there is another bucket open that shares a prefix (like + # ab/abfff). + os.rmdir(os.path.dirname(os.path.dirname(self.incominghome))) + # we leave the great-grandparent (incoming/) directory in place. + except EnvironmentError: + # ignore the "can't rmdir because the directory is not empty" + # exceptions, those are normal consequences of the + # above-mentioned conditions. + pass + self._sharefile = None + self.closed = True + self._canary.dontNotifyOnDisconnect(self._disconnect_marker) + + filelen = get_used_space(self.finalhome) + self.ss.bucket_writer_closed(self, filelen) + self._account.add_or_renew_default_lease(self._storage_index, self._shnum) + self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen) + self.ss.add_latency("close", time.time() - start) + self.ss.count("close") + + def _disconnected(self): + if not self.closed: + self._abort() + + def remote_abort(self): + log.msg("storage: aborting sharefile %s" % self.incominghome, + facility="tahoe.storage", level=log.UNUSUAL) + if not self.closed: + self._canary.dontNotifyOnDisconnect(self._disconnect_marker) + self._abort() + self.ss.count("abort") + + def _abort(self): + if self.closed: + return + + os.remove(self.incominghome) + # if we were the last share to be moved, remove the incoming/ + # directory that was our parent + parentdir = os.path.split(self.incominghome)[0] + if not os.listdir(parentdir): + os.rmdir(parentdir) + self._sharefile = None + self._account.remove_share_and_leases(self._storage_index, self._shnum) + + # We are now considered closed for further writing. We must tell + # the storage server about this so that it stops expecting us to + # use the space it allocated for us earlier. + self.closed = True + self.ss.bucket_writer_closed(self, 0) + + +class BucketReader(Referenceable): + implements(RIBucketReader) + + def __init__(self, ss, sharefname, storage_index=None, shnum=None): + self.ss = ss + self._share_file = ShareFile(sharefname) + self.storage_index = storage_index + self.shnum = shnum + + def __repr__(self): + return "<%s %s %s>" % (self.__class__.__name__, + base32.b2a_l(self.storage_index[:8], 60), + self.shnum) + + def remote_read(self, offset, length): + start = time.time() + data = self._share_file.read_share_data(offset, length) + self.ss.add_latency("read", time.time() - start) + self.ss.count("read") + return data + + def remote_advise_corrupt_share(self, reason): + return self.ss.client_advise_corrupt_share("immutable", + self.storage_index, + self.shnum, + reason) diff --git a/src/allmydata/storage/backends/disk/mutable.py b/src/allmydata/storage/backends/disk/mutable.py new file mode 100644 index 00000000..4d834958 --- /dev/null +++ b/src/allmydata/storage/backends/disk/mutable.py @@ -0,0 +1,255 @@ + +import os, struct + +from allmydata.interfaces import BadWriteEnablerError +from allmydata.util import idlib, log +from allmydata.util.assertutil import precondition +from allmydata.util.hashutil import timing_safe_compare +from allmydata.util.fileutil import get_used_space +from allmydata.storage.common import UnknownMutableContainerVersionError, \ + DataTooLargeError +from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE + + +# the MutableShareFile is like the ShareFile, but used for mutable data. It +# has a different layout. See docs/mutable.txt for more details. + +# # offset size name +# 1 0 32 magic verstr "tahoe mutable container v1" plus binary +# 2 32 20 write enabler's nodeid +# 3 52 32 write enabler +# 4 84 8 data size (actual share data present) (a) +# 5 92 8 offset of (8) count of extra leases (after data) +# 6 100 368 four leases, 92 bytes each +# 0 4 ownerid (0 means "no lease here") +# 4 4 expiration timestamp +# 8 32 renewal token +# 40 32 cancel token +# 72 20 nodeid which accepted the tokens +# 7 468 (a) data +# 8 ?? 4 count of extra leases +# 9 ?? n*92 extra leases + + +# The struct module doc says that L's are 4 bytes in size., and that Q's are +# 8 bytes in size. Since compatibility depends upon this, double-check it. +assert struct.calcsize(">L") == 4, struct.calcsize(">L") +assert struct.calcsize(">Q") == 8, struct.calcsize(">Q") + +class MutableShareFile: + + sharetype = "mutable" + DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s") + HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases + LEASE_SIZE = struct.calcsize(">LL32s32s20s") + assert LEASE_SIZE == 92 + DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE + assert DATA_OFFSET == 468, DATA_OFFSET + # our sharefiles share with a recognizable string, plus some random + # binary data to reduce the chance that a regular text file will look + # like a sharefile. + MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e" + assert len(MAGIC) == 32 + MAX_SIZE = MAX_MUTABLE_SHARE_SIZE + # TODO: decide upon a policy for max share size + + def __init__(self, filename, parent=None): + self.home = filename + if os.path.exists(self.home): + # we don't cache anything, just check the magic + f = open(self.home, 'rb') + data = f.read(self.HEADER_SIZE) + (magic, + write_enabler_nodeid, write_enabler, + data_length, extra_lease_offset) = \ + struct.unpack(">32s20s32sQQ", data) + if magic != self.MAGIC: + msg = "sharefile %s had magic '%r' but we wanted '%r'" % \ + (filename, magic, self.MAGIC) + raise UnknownMutableContainerVersionError(msg) + self.parent = parent # for logging + + def log(self, *args, **kwargs): + return self.parent.log(*args, **kwargs) + + def create(self, my_nodeid, write_enabler): + assert not os.path.exists(self.home) + data_length = 0 + extra_lease_offset = (self.HEADER_SIZE + + 4 * self.LEASE_SIZE + + data_length) + assert extra_lease_offset == self.DATA_OFFSET # true at creation + num_extra_leases = 0 + f = open(self.home, 'wb') + header = struct.pack(">32s20s32sQQ", + self.MAGIC, my_nodeid, write_enabler, + data_length, extra_lease_offset, + ) + leases = ("\x00"*self.LEASE_SIZE) * 4 + f.write(header + leases) + # data goes here, empty after creation + f.write(struct.pack(">L", num_extra_leases)) + # extra leases go here, none at creation + f.close() + + def get_used_space(self): + return get_used_space(self.home) + + def unlink(self): + os.unlink(self.home) + + def get_size(self): + return os.stat(self.home).st_size + + def _read_data_length(self, f): + f.seek(self.DATA_LENGTH_OFFSET) + (data_length,) = struct.unpack(">Q", f.read(8)) + return data_length + + def _read_container_size(self, f): + f.seek(self.DATA_LENGTH_OFFSET + 8) + (extra_lease_offset,) = struct.unpack(">Q", f.read(8)) + return extra_lease_offset - self.DATA_OFFSET + + def _write_data_length(self, f, data_length): + extra_lease_offset = self.DATA_OFFSET + data_length + f.seek(self.DATA_LENGTH_OFFSET) + f.write(struct.pack(">QQ", data_length, extra_lease_offset)) + f.seek(extra_lease_offset) + f.write(struct.pack(">L", 0)) + + def _read_share_data(self, f, offset, length): + precondition(offset >= 0) + data_length = self._read_data_length(f) + if offset+length > data_length: + # reads beyond the end of the data are truncated. Reads that + # start beyond the end of the data return an empty string. + length = max(0, data_length-offset) + if length == 0: + return "" + precondition(offset+length <= data_length) + f.seek(self.DATA_OFFSET+offset) + data = f.read(length) + return data + + def _write_share_data(self, f, offset, data): + length = len(data) + precondition(offset >= 0) + data_length = self._read_data_length(f) + + if offset+length >= data_length: + # They are expanding their data size. + + if offset+length > self.MAX_SIZE: + raise DataTooLargeError() + + # Their data now fits in the current container. We must write + # their new data and modify the recorded data size. + + # Fill any newly exposed empty space with 0's. + if offset > data_length: + f.seek(self.DATA_OFFSET+data_length) + f.write('\x00'*(offset - data_length)) + f.flush() + + new_data_length = offset+length + self._write_data_length(f, new_data_length) + # an interrupt here will result in a corrupted share + + # now all that's left to do is write out their data + f.seek(self.DATA_OFFSET+offset) + f.write(data) + return + + def _read_write_enabler_and_nodeid(self, f): + f.seek(0) + data = f.read(self.HEADER_SIZE) + (magic, + write_enabler_nodeid, write_enabler, + data_length, extra_least_offset) = \ + struct.unpack(">32s20s32sQQ", data) + assert magic == self.MAGIC + return (write_enabler, write_enabler_nodeid) + + def readv(self, readv): + datav = [] + f = open(self.home, 'rb') + for (offset, length) in readv: + datav.append(self._read_share_data(f, offset, length)) + f.close() + return datav + + def check_write_enabler(self, write_enabler, si_s): + f = open(self.home, 'rb+') + (real_write_enabler, write_enabler_nodeid) = \ + self._read_write_enabler_and_nodeid(f) + f.close() + # avoid a timing attack + #if write_enabler != real_write_enabler: + if not timing_safe_compare(write_enabler, real_write_enabler): + # accomodate share migration by reporting the nodeid used for the + # old write enabler. + self.log(format="bad write enabler on SI %(si)s," + " recorded by nodeid %(nodeid)s", + facility="tahoe.storage", + level=log.WEIRD, umid="cE1eBQ", + si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid)) + msg = "The write enabler was recorded by nodeid '%s'." % \ + (idlib.nodeid_b2a(write_enabler_nodeid),) + raise BadWriteEnablerError(msg) + + def check_testv(self, testv): + test_good = True + f = open(self.home, 'rb+') + for (offset, length, operator, specimen) in testv: + data = self._read_share_data(f, offset, length) + if not testv_compare(data, operator, specimen): + test_good = False + break + f.close() + return test_good + + def writev(self, datav, new_length): + f = open(self.home, 'rb+') + for (offset, data) in datav: + self._write_share_data(f, offset, data) + if new_length is not None: + cur_length = self._read_data_length(f) + if new_length < cur_length: + self._write_data_length(f, new_length) + # TODO: shrink the share file. + f.close() + +def testv_compare(a, op, b): + assert op in ("lt", "le", "eq", "ne", "ge", "gt") + if op == "lt": + return a < b + if op == "le": + return a <= b + if op == "eq": + return a == b + if op == "ne": + return a != b + if op == "ge": + return a >= b + if op == "gt": + return a > b + # never reached + +class EmptyShare: + + def check_testv(self, testv): + test_good = True + for (offset, length, operator, specimen) in testv: + data = "" + if not testv_compare(data, operator, specimen): + test_good = False + break + return test_good + +def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent): + ms = MutableShareFile(filename, parent) + ms.create(my_nodeid, write_enabler) + del ms + return MutableShareFile(filename, parent) + diff --git a/src/allmydata/storage/backends/null/__init__.py b/src/allmydata/storage/backends/null/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py deleted file mode 100644 index 780b04a4..00000000 --- a/src/allmydata/storage/immutable.py +++ /dev/null @@ -1,250 +0,0 @@ - -import os, struct, time - -from foolscap.api import Referenceable - -from zope.interface import implements -from allmydata.interfaces import RIBucketWriter, RIBucketReader -from allmydata.util import base32, fileutil, log -from allmydata.util.fileutil import get_used_space -from allmydata.util.assertutil import precondition -from allmydata.storage.common import UnknownImmutableContainerVersionError, \ - DataTooLargeError -from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE - - -# Each share file (in storage/shares/$SI/$SHNUM) contains share data that -# can be accessed by RIBucketWriter.write and RIBucketReader.read . - -# The share file has the following layout: -# 0x00: share file version number, four bytes, current version is 1 -# 0x04: share data length, four bytes big-endian # Footnote 1 -# 0x08: number of leases, four bytes big-endian = N # Footnote 2 -# 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy) -# filesize - 72*N: leases (ignored). Each lease is 72 bytes. - -# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers. - -# Footnote 2: as of Tahoe v1.11.0 this field is not used by storage servers. -# New shares will have a 0 here. Old shares will have whatever value was left -# over when the server was upgraded. All lease information is now kept in the -# leasedb. - - -class ShareFile: - sharetype = "immutable" - LEASE_SIZE = struct.calcsize(">L32s32sL") - HEADER = ">LLL" - HEADER_SIZE = struct.calcsize(HEADER) - DATA_OFFSET = HEADER_SIZE - - def __init__(self, filename, max_size=None, create=False): - """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """ - precondition((max_size is not None) or (not create), max_size, create) - self.home = filename - self._max_size = max_size - if create: - # touch the file, so later callers will see that we're working on - # it. Also construct the metadata. - assert not os.path.exists(self.home) - fileutil.make_dirs(os.path.dirname(self.home)) - f = open(self.home, 'wb') - # The second field -- the four-byte share data length -- is no - # longer used as of Tahoe v1.3.0, but we continue to write it in - # there in case someone downgrades a storage server from >= - # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one - # server to another, etc. We do saturation -- a share data length - # larger than 2**32-1 (what can fit into the field) is marked as - # the largest length that can fit into the field. That way, even - # if this does happen, the old < v1.3.0 server will still allow - # clients to read the first part of the share. - f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0)) - f.close() - self._data_length = max_size - else: - f = open(self.home, 'rb') - try: - (version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE)) - finally: - f.close() - if version != 1: - msg = "sharefile %s had version %d but we wanted 1" % \ - (filename, version) - raise UnknownImmutableContainerVersionError(msg) - - filesize = os.stat(self.home).st_size - self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE) - - # TODO: raise a better exception. - assert self._data_length >= 0, self._data_length - - def get_used_space(self): - return get_used_space(self.home) - - def unlink(self): - os.unlink(self.home) - - def get_size(self): - return os.stat(self.home).st_size - - def read_share_data(self, offset, length): - precondition(offset >= 0) - - # Reads beyond the end of the data are truncated. Reads that start - # beyond the end of the data return an empty string. - seekpos = self.DATA_OFFSET + offset - actuallength = max(0, min(length, self._data_length - offset)) - if actuallength == 0: - return "" - f = open(self.home, 'rb') - try: - f.seek(seekpos) - return f.read(actuallength) - finally: - f.close() - - def write_share_data(self, offset, data): - length = len(data) - precondition(offset >= 0, offset) - if self._max_size is not None and offset+length > self._max_size: - raise DataTooLargeError(self._max_size, offset, length) - f = open(self.home, 'rb+') - try: - real_offset = self.DATA_OFFSET + offset - f.seek(real_offset) - assert f.tell() == real_offset - f.write(data) - finally: - f.close() - - -class BucketWriter(Referenceable): - implements(RIBucketWriter) - - def __init__(self, ss, account, storage_index, shnum, - incominghome, finalhome, max_size, canary): - self.ss = ss - self.incominghome = incominghome - self.finalhome = finalhome - self._max_size = max_size # don't allow the client to write more than this - self._account = account - self._storage_index = storage_index - self._shnum = shnum - self._canary = canary - self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) - self.closed = False - self.throw_out_all_data = False - self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) - self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE) - - def allocated_size(self): - return self._max_size - - def remote_write(self, offset, data): - start = time.time() - precondition(not self.closed) - if self.throw_out_all_data: - return - self._sharefile.write_share_data(offset, data) - self.ss.add_latency("write", time.time() - start) - self.ss.count("write") - - def remote_close(self): - precondition(not self.closed) - start = time.time() - - fileutil.make_dirs(os.path.dirname(self.finalhome)) - fileutil.rename(self.incominghome, self.finalhome) - try: - # self.incominghome is like storage/shares/incoming/ab/abcde/4 . - # We try to delete the parent (.../ab/abcde) to avoid leaving - # these directories lying around forever, but the delete might - # fail if we're working on another share for the same storage - # index (like ab/abcde/5). The alternative approach would be to - # use a hierarchy of objects (PrefixHolder, BucketHolder, - # ShareWriter), each of which is responsible for a single - # directory on disk, and have them use reference counting of - # their children to know when they should do the rmdir. This - # approach is simpler, but relies on os.rmdir refusing to delete - # a non-empty directory. Do *not* use fileutil.rm_dir() here! - os.rmdir(os.path.dirname(self.incominghome)) - # we also delete the grandparent (prefix) directory, .../ab , - # again to avoid leaving directories lying around. This might - # fail if there is another bucket open that shares a prefix (like - # ab/abfff). - os.rmdir(os.path.dirname(os.path.dirname(self.incominghome))) - # we leave the great-grandparent (incoming/) directory in place. - except EnvironmentError: - # ignore the "can't rmdir because the directory is not empty" - # exceptions, those are normal consequences of the - # above-mentioned conditions. - pass - self._sharefile = None - self.closed = True - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) - - filelen = get_used_space(self.finalhome) - self.ss.bucket_writer_closed(self, filelen) - self._account.add_or_renew_default_lease(self._storage_index, self._shnum) - self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen) - self.ss.add_latency("close", time.time() - start) - self.ss.count("close") - - def _disconnected(self): - if not self.closed: - self._abort() - - def remote_abort(self): - log.msg("storage: aborting sharefile %s" % self.incominghome, - facility="tahoe.storage", level=log.UNUSUAL) - if not self.closed: - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) - self._abort() - self.ss.count("abort") - - def _abort(self): - if self.closed: - return - - os.remove(self.incominghome) - # if we were the last share to be moved, remove the incoming/ - # directory that was our parent - parentdir = os.path.split(self.incominghome)[0] - if not os.listdir(parentdir): - os.rmdir(parentdir) - self._sharefile = None - self._account.remove_share_and_leases(self._storage_index, self._shnum) - - # We are now considered closed for further writing. We must tell - # the storage server about this so that it stops expecting us to - # use the space it allocated for us earlier. - self.closed = True - self.ss.bucket_writer_closed(self, 0) - - -class BucketReader(Referenceable): - implements(RIBucketReader) - - def __init__(self, ss, sharefname, storage_index=None, shnum=None): - self.ss = ss - self._share_file = ShareFile(sharefname) - self.storage_index = storage_index - self.shnum = shnum - - def __repr__(self): - return "<%s %s %s>" % (self.__class__.__name__, - base32.b2a_l(self.storage_index[:8], 60), - self.shnum) - - def remote_read(self, offset, length): - start = time.time() - data = self._share_file.read_share_data(offset, length) - self.ss.add_latency("read", time.time() - start) - self.ss.count("read") - return data - - def remote_advise_corrupt_share(self, reason): - return self.ss.client_advise_corrupt_share("immutable", - self.storage_index, - self.shnum, - reason) diff --git a/src/allmydata/storage/mutable.py b/src/allmydata/storage/mutable.py deleted file mode 100644 index 4d834958..00000000 --- a/src/allmydata/storage/mutable.py +++ /dev/null @@ -1,255 +0,0 @@ - -import os, struct - -from allmydata.interfaces import BadWriteEnablerError -from allmydata.util import idlib, log -from allmydata.util.assertutil import precondition -from allmydata.util.hashutil import timing_safe_compare -from allmydata.util.fileutil import get_used_space -from allmydata.storage.common import UnknownMutableContainerVersionError, \ - DataTooLargeError -from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE - - -# the MutableShareFile is like the ShareFile, but used for mutable data. It -# has a different layout. See docs/mutable.txt for more details. - -# # offset size name -# 1 0 32 magic verstr "tahoe mutable container v1" plus binary -# 2 32 20 write enabler's nodeid -# 3 52 32 write enabler -# 4 84 8 data size (actual share data present) (a) -# 5 92 8 offset of (8) count of extra leases (after data) -# 6 100 368 four leases, 92 bytes each -# 0 4 ownerid (0 means "no lease here") -# 4 4 expiration timestamp -# 8 32 renewal token -# 40 32 cancel token -# 72 20 nodeid which accepted the tokens -# 7 468 (a) data -# 8 ?? 4 count of extra leases -# 9 ?? n*92 extra leases - - -# The struct module doc says that L's are 4 bytes in size., and that Q's are -# 8 bytes in size. Since compatibility depends upon this, double-check it. -assert struct.calcsize(">L") == 4, struct.calcsize(">L") -assert struct.calcsize(">Q") == 8, struct.calcsize(">Q") - -class MutableShareFile: - - sharetype = "mutable" - DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s") - HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases - LEASE_SIZE = struct.calcsize(">LL32s32s20s") - assert LEASE_SIZE == 92 - DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE - assert DATA_OFFSET == 468, DATA_OFFSET - # our sharefiles share with a recognizable string, plus some random - # binary data to reduce the chance that a regular text file will look - # like a sharefile. - MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e" - assert len(MAGIC) == 32 - MAX_SIZE = MAX_MUTABLE_SHARE_SIZE - # TODO: decide upon a policy for max share size - - def __init__(self, filename, parent=None): - self.home = filename - if os.path.exists(self.home): - # we don't cache anything, just check the magic - f = open(self.home, 'rb') - data = f.read(self.HEADER_SIZE) - (magic, - write_enabler_nodeid, write_enabler, - data_length, extra_lease_offset) = \ - struct.unpack(">32s20s32sQQ", data) - if magic != self.MAGIC: - msg = "sharefile %s had magic '%r' but we wanted '%r'" % \ - (filename, magic, self.MAGIC) - raise UnknownMutableContainerVersionError(msg) - self.parent = parent # for logging - - def log(self, *args, **kwargs): - return self.parent.log(*args, **kwargs) - - def create(self, my_nodeid, write_enabler): - assert not os.path.exists(self.home) - data_length = 0 - extra_lease_offset = (self.HEADER_SIZE - + 4 * self.LEASE_SIZE - + data_length) - assert extra_lease_offset == self.DATA_OFFSET # true at creation - num_extra_leases = 0 - f = open(self.home, 'wb') - header = struct.pack(">32s20s32sQQ", - self.MAGIC, my_nodeid, write_enabler, - data_length, extra_lease_offset, - ) - leases = ("\x00"*self.LEASE_SIZE) * 4 - f.write(header + leases) - # data goes here, empty after creation - f.write(struct.pack(">L", num_extra_leases)) - # extra leases go here, none at creation - f.close() - - def get_used_space(self): - return get_used_space(self.home) - - def unlink(self): - os.unlink(self.home) - - def get_size(self): - return os.stat(self.home).st_size - - def _read_data_length(self, f): - f.seek(self.DATA_LENGTH_OFFSET) - (data_length,) = struct.unpack(">Q", f.read(8)) - return data_length - - def _read_container_size(self, f): - f.seek(self.DATA_LENGTH_OFFSET + 8) - (extra_lease_offset,) = struct.unpack(">Q", f.read(8)) - return extra_lease_offset - self.DATA_OFFSET - - def _write_data_length(self, f, data_length): - extra_lease_offset = self.DATA_OFFSET + data_length - f.seek(self.DATA_LENGTH_OFFSET) - f.write(struct.pack(">QQ", data_length, extra_lease_offset)) - f.seek(extra_lease_offset) - f.write(struct.pack(">L", 0)) - - def _read_share_data(self, f, offset, length): - precondition(offset >= 0) - data_length = self._read_data_length(f) - if offset+length > data_length: - # reads beyond the end of the data are truncated. Reads that - # start beyond the end of the data return an empty string. - length = max(0, data_length-offset) - if length == 0: - return "" - precondition(offset+length <= data_length) - f.seek(self.DATA_OFFSET+offset) - data = f.read(length) - return data - - def _write_share_data(self, f, offset, data): - length = len(data) - precondition(offset >= 0) - data_length = self._read_data_length(f) - - if offset+length >= data_length: - # They are expanding their data size. - - if offset+length > self.MAX_SIZE: - raise DataTooLargeError() - - # Their data now fits in the current container. We must write - # their new data and modify the recorded data size. - - # Fill any newly exposed empty space with 0's. - if offset > data_length: - f.seek(self.DATA_OFFSET+data_length) - f.write('\x00'*(offset - data_length)) - f.flush() - - new_data_length = offset+length - self._write_data_length(f, new_data_length) - # an interrupt here will result in a corrupted share - - # now all that's left to do is write out their data - f.seek(self.DATA_OFFSET+offset) - f.write(data) - return - - def _read_write_enabler_and_nodeid(self, f): - f.seek(0) - data = f.read(self.HEADER_SIZE) - (magic, - write_enabler_nodeid, write_enabler, - data_length, extra_least_offset) = \ - struct.unpack(">32s20s32sQQ", data) - assert magic == self.MAGIC - return (write_enabler, write_enabler_nodeid) - - def readv(self, readv): - datav = [] - f = open(self.home, 'rb') - for (offset, length) in readv: - datav.append(self._read_share_data(f, offset, length)) - f.close() - return datav - - def check_write_enabler(self, write_enabler, si_s): - f = open(self.home, 'rb+') - (real_write_enabler, write_enabler_nodeid) = \ - self._read_write_enabler_and_nodeid(f) - f.close() - # avoid a timing attack - #if write_enabler != real_write_enabler: - if not timing_safe_compare(write_enabler, real_write_enabler): - # accomodate share migration by reporting the nodeid used for the - # old write enabler. - self.log(format="bad write enabler on SI %(si)s," - " recorded by nodeid %(nodeid)s", - facility="tahoe.storage", - level=log.WEIRD, umid="cE1eBQ", - si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid)) - msg = "The write enabler was recorded by nodeid '%s'." % \ - (idlib.nodeid_b2a(write_enabler_nodeid),) - raise BadWriteEnablerError(msg) - - def check_testv(self, testv): - test_good = True - f = open(self.home, 'rb+') - for (offset, length, operator, specimen) in testv: - data = self._read_share_data(f, offset, length) - if not testv_compare(data, operator, specimen): - test_good = False - break - f.close() - return test_good - - def writev(self, datav, new_length): - f = open(self.home, 'rb+') - for (offset, data) in datav: - self._write_share_data(f, offset, data) - if new_length is not None: - cur_length = self._read_data_length(f) - if new_length < cur_length: - self._write_data_length(f, new_length) - # TODO: shrink the share file. - f.close() - -def testv_compare(a, op, b): - assert op in ("lt", "le", "eq", "ne", "ge", "gt") - if op == "lt": - return a < b - if op == "le": - return a <= b - if op == "eq": - return a == b - if op == "ne": - return a != b - if op == "ge": - return a >= b - if op == "gt": - return a > b - # never reached - -class EmptyShare: - - def check_testv(self, testv): - test_good = True - for (offset, length, operator, specimen) in testv: - data = "" - if not testv_compare(data, operator, specimen): - test_good = False - break - return test_good - -def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent): - ms = MutableShareFile(filename, parent) - ms.create(my_nodeid, write_enabler) - del ms - return MutableShareFile(filename, parent) - diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 857ea21a..2403e11b 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -10,10 +10,10 @@ import allmydata # for __full_version__ from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported -from allmydata.storage.mutable import MutableShareFile, EmptyShare, \ +from allmydata.storage.backends.disk.mutable import MutableShareFile, EmptyShare, \ create_mutable_sharefile from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE -from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader +from allmydata.storage.backends.disk.immutable import ShareFile, BucketWriter, BucketReader from allmydata.storage.crawler import BucketCountingCrawler from allmydata.storage.accountant import Accountant from allmydata.storage.expiration import ExpirationPolicy diff --git a/src/allmydata/storage/shares.py b/src/allmydata/storage/shares.py index 558bddc1..bcdf98c4 100644 --- a/src/allmydata/storage/shares.py +++ b/src/allmydata/storage/shares.py @@ -1,7 +1,7 @@ #! /usr/bin/python -from allmydata.storage.mutable import MutableShareFile -from allmydata.storage.immutable import ShareFile +from allmydata.storage.backends.disk.mutable import MutableShareFile +from allmydata.storage.backends.disk.immutable import ShareFile def get_share_file(filename): f = open(filename, "rb") diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index 311a82c6..fe4d2aa0 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -17,7 +17,7 @@ from allmydata.check_results import CheckResults, CheckAndRepairResults, \ from allmydata.storage_client import StubServer from allmydata.mutable.layout import unpack_header from allmydata.mutable.publish import MutableData -from allmydata.storage.mutable import MutableShareFile +from allmydata.storage.backends.disk.mutable import MutableShareFile from allmydata.util import hashutil, log, fileutil, pollmixin from allmydata.util.assertutil import precondition from allmydata.util.consumer import download_to_data diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 4d65a377..1bae64ac 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -11,8 +11,8 @@ import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil, base32, time_format from allmydata.storage.server import StorageServer -from allmydata.storage.mutable import MutableShareFile -from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile +from allmydata.storage.backends.disk.mutable import MutableShareFile +from allmydata.storage.backends.disk.immutable import BucketWriter, BucketReader, ShareFile from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \ UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE, SHARETYPE_MUTABLE diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index dc3df3da..42afc961 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -8,7 +8,7 @@ from twisted.internet import threads # CLI tests use deferToThread import allmydata from allmydata import uri -from allmydata.storage.mutable import MutableShareFile +from allmydata.storage.backends.disk.mutable import MutableShareFile from allmydata.storage.server import si_a2b from allmydata.immutable import offloaded, upload from allmydata.immutable.literal import LiteralFileNode