From: Daira Hopwood Date: Tue, 8 Apr 2014 23:56:58 +0000 (+0100) Subject: Changes to Bucket{Reader,Writer} and disk backend (rebased). X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/%22doc.html/...?a=commitdiff_plain;h=3288ca3ca6777f5e27ad99e0d7a9e1be4641438e;p=tahoe-lafs%2Ftahoe-lafs.git Changes to Bucket{Reader,Writer} and disk backend (rebased). Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py index b938794f..7e20996f 100644 --- a/src/allmydata/mutable/layout.py +++ b/src/allmydata/mutable/layout.py @@ -71,6 +71,11 @@ HEADER_LENGTH = struct.calcsize(HEADER) OFFSETS = ">LLLLQQ" OFFSETS_LENGTH = struct.calcsize(OFFSETS) +# our sharefiles share with a recognizable string, plus some random +# binary data to reduce the chance that a regular text file will look +# like a sharefile. +MUTABLE_MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e" + MAX_MUTABLE_SHARE_SIZE = 69105*1000*1000*1000*1000 # 69105 TB, kind of arbitrary diff --git a/src/allmydata/storage/backends/disk/immutable.py b/src/allmydata/storage/backends/disk/immutable.py index a8b0fc08..b7bba0db 100644 --- a/src/allmydata/storage/backends/disk/immutable.py +++ b/src/allmydata/storage/backends/disk/immutable.py @@ -1,15 +1,19 @@ -import os, struct +import os, os.path, struct + +from twisted.internet import defer + +from zope.interface import implements +from allmydata.interfaces import IShareForReading, IShareForWriting from allmydata.util import fileutil -from allmydata.util.fileutil import get_used_space -from allmydata.util.assertutil import precondition -from allmydata.storage.common import UnknownImmutableContainerVersionError, \ +from allmydata.util.assertutil import precondition, _assert +from allmydata.storage.common import si_b2a, UnknownImmutableContainerVersionError, \ DataTooLargeError -# Each share file (in storage/shares/$SI/$SHNUM) contains share data that -# can be accessed by RIBucketWriter.write and RIBucketReader.read . +# Each share file (in storage/shares/$PREFIX/$STORAGEINDEX/$SHNUM) contains +# share data that can be accessed by RIBucketWriter.write and RIBucketReader.read . # The share file has the following layout: # 0x00: share file version number, four bytes, current version is 1 @@ -25,25 +29,41 @@ from allmydata.storage.common import UnknownImmutableContainerVersionError, \ # over when the server was upgraded. All lease information is now kept in the # leasedb. +class ImmutableDiskShare(object): + implements(IShareForReading, IShareForWriting) -class ShareFile: sharetype = "immutable" LEASE_SIZE = struct.calcsize(">L32s32sL") HEADER = ">LLL" HEADER_SIZE = struct.calcsize(HEADER) DATA_OFFSET = HEADER_SIZE - def __init__(self, filename, max_size=None, create=False): - """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """ - precondition((max_size is not None) or (not create), max_size, create) - self.home = filename - self._max_size = max_size - if create: - # touch the file, so later callers will see that we're working on + def __init__(self, home, storage_index, shnum, finalhome=None, allocated_data_length=None): + """ + If allocated_data_length is not None then I won't allow more than allocated_data_length + to be written to me. + If finalhome is not None (meaning that we are creating the share) then allocated_data_length + must not be None. + + Clients should use the load_immutable_disk_share and create_immutable_disk_share + factory functions rather than creating instances directly. + """ + precondition((allocated_data_length is not None) or (finalhome is None), + allocated_data_length=allocated_data_length, finalhome=finalhome) + self._storage_index = storage_index + self._allocated_data_length = allocated_data_length + + # If we are creating the share, _finalhome refers to the final path and + # _home to the incoming path. Otherwise, _finalhome is None. + self._finalhome = finalhome + self._home = home + self._shnum = shnum + + if self._finalhome is not None: + # Touch the file, so later callers will see that we're working on # it. Also construct the metadata. - assert not os.path.exists(self.home) - fileutil.make_dirs(os.path.dirname(self.home)) - f = open(self.home, 'wb') + _assert(not os.path.exists(self._finalhome), finalhome=self._finalhome) + fileutil.make_dirs(os.path.dirname(self._home)) # The second field -- the four-byte share data length -- is no # longer used as of Tahoe v1.3.0, but we continue to write it in # there in case someone downgrades a storage server from >= @@ -53,36 +73,99 @@ class ShareFile: # the largest length that can fit into the field. That way, even # if this does happen, the old < v1.3.0 server will still allow # clients to read the first part of the share. - f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0)) - f.close() - self._data_length = max_size + fileutil.write(self._home, struct.pack(self.HEADER, 1, min(2**32-1, allocated_data_length), 0)) + self._data_length = allocated_data_length else: - f = open(self.home, 'rb') + f = open(self._home, 'rb') try: (version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE)) finally: f.close() if version != 1: - msg = "sharefile %s had version %d but we wanted 1" % \ - (filename, version) + msg = "sharefile %r had version %d but we wanted 1" % (self._home, version) raise UnknownImmutableContainerVersionError(msg) - filesize = os.stat(self.home).st_size + filesize = os.stat(self._home).st_size self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE) # TODO: raise a better exception. - assert self._data_length >= 0, self._data_length + _assert(self._data_length >= 0, data_length=self._data_length) + + def __repr__(self): + return ("" + % (si_b2a(self._storage_index or ""), self._shnum, self._home)) + + def close(self): + fileutil.make_dirs(os.path.dirname(self._finalhome)) + fileutil.move_into_place(self._home, self._finalhome) + + # self._home is like storage/shares/incoming/ab/abcde/4 . + # We try to delete the parent (.../ab/abcde) to avoid leaving + # these directories lying around forever, but the delete might + # fail if we're working on another share for the same storage + # index (like ab/abcde/5). The alternative approach would be to + # use a hierarchy of objects (PrefixHolder, BucketHolder, + # ShareWriter), each of which is responsible for a single + # directory on disk, and have them use reference counting of + # their children to know when they should do the rmdir. This + # approach is simpler, but relies on os.rmdir (used by + # rmdir_if_empty) refusing to delete a non-empty directory. + # Do *not* use fileutil.remove() here! + parent = os.path.dirname(self._home) + fileutil.rmdir_if_empty(parent) + + # we also delete the grandparent (prefix) directory, .../ab , + # again to avoid leaving directories lying around. This might + # fail if there is another bucket open that shares a prefix (like + # ab/abfff). + fileutil.rmdir_if_empty(os.path.dirname(parent)) + + # we leave the great-grandparent (incoming/) directory in place. + + self._home = self._finalhome + self._finalhome = None + return defer.succeed(None) def get_used_space(self): - return get_used_space(self.home) + return (fileutil.get_used_space(self._finalhome) + + fileutil.get_used_space(self._home)) + + def get_storage_index(self): + return self._storage_index + + def get_storage_index_string(self): + return si_b2a(self._storage_index) + + def get_shnum(self): + return self._shnum def unlink(self): - os.unlink(self.home) + fileutil.remove(self._home) + return defer.succeed(None) + + def get_allocated_data_length(self): + return self._allocated_data_length def get_size(self): - return os.stat(self.home).st_size + return os.stat(self._home).st_size - def read_share_data(self, offset, length): + def get_data_length(self): + return self._data_length + + def readv(self, readv): + datav = [] + f = open(self._home, 'rb') + try: + for (offset, length) in readv: + datav.append(self._read_share_data(f, offset, length)) + finally: + f.close() + return defer.succeed(datav) + + def _get_path(self): + return self._home + + def _read_share_data(self, f, offset, length): precondition(offset >= 0) # Reads beyond the end of the data are truncated. Reads that start @@ -91,23 +174,35 @@ class ShareFile: actuallength = max(0, min(length, self._data_length - offset)) if actuallength == 0: return "" - f = open(self.home, 'rb') + f.seek(seekpos) + return f.read(actuallength) + + def read_share_data(self, offset, length): + f = open(self._home, 'rb') try: - f.seek(seekpos) - return f.read(actuallength) + return defer.succeed(self._read_share_data(f, offset, length)) finally: f.close() def write_share_data(self, offset, data): length = len(data) precondition(offset >= 0, offset) - if self._max_size is not None and offset+length > self._max_size: - raise DataTooLargeError(self._max_size, offset, length) - f = open(self.home, 'rb+') + if self._allocated_data_length is not None and offset+length > self._allocated_data_length: + raise DataTooLargeError(self._allocated_data_length, offset, length) + f = open(self._home, 'rb+') try: real_offset = self.DATA_OFFSET + offset f.seek(real_offset) - assert f.tell() == real_offset + _assert(f.tell() == real_offset) f.write(data) + return defer.succeed(None) finally: f.close() + + +def load_immutable_disk_share(home, storage_index=None, shnum=None): + return ImmutableDiskShare(home, storage_index=storage_index, shnum=shnum) + +def create_immutable_disk_share(home, finalhome, allocated_data_length, storage_index=None, shnum=None): + return ImmutableDiskShare(home, finalhome=finalhome, allocated_data_length=allocated_data_length, + storage_index=storage_index, shnum=shnum) diff --git a/src/allmydata/storage/backends/disk/mutable.py b/src/allmydata/storage/backends/disk/mutable.py index 4d834958..8eaadd25 100644 --- a/src/allmydata/storage/backends/disk/mutable.py +++ b/src/allmydata/storage/backends/disk/mutable.py @@ -1,18 +1,22 @@ import os, struct -from allmydata.interfaces import BadWriteEnablerError -from allmydata.util import idlib, log -from allmydata.util.assertutil import precondition +from twisted.internet import defer + +from zope.interface import implements +from allmydata.interfaces import IMutableShare, BadWriteEnablerError + +from allmydata.util import fileutil, idlib, log +from allmydata.util.assertutil import precondition, _assert from allmydata.util.hashutil import timing_safe_compare -from allmydata.util.fileutil import get_used_space -from allmydata.storage.common import UnknownMutableContainerVersionError, \ +from allmydata.storage.common import si_b2a, UnknownMutableContainerVersionError, \ DataTooLargeError -from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE +from allmydata.storage.backends.base import testv_compare +from allmydata.mutable.layout import MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE -# the MutableShareFile is like the ShareFile, but used for mutable data. It -# has a different layout. See docs/mutable.txt for more details. +# MutableDiskShare is like ImmutableDiskShare, but used for mutable data. +# Mutable shares have a different layout. See docs/mutable.rst for more details. # # offset size name # 1 0 32 magic verstr "tahoe mutable container v1" plus binary @@ -20,86 +24,119 @@ from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE # 3 52 32 write enabler # 4 84 8 data size (actual share data present) (a) # 5 92 8 offset of (8) count of extra leases (after data) -# 6 100 368 four leases, 92 bytes each -# 0 4 ownerid (0 means "no lease here") -# 4 4 expiration timestamp -# 8 32 renewal token -# 40 32 cancel token -# 72 20 nodeid which accepted the tokens +# 6 100 368 four leases, 92 bytes each (ignored) # 7 468 (a) data # 8 ?? 4 count of extra leases -# 9 ?? n*92 extra leases +# 9 ?? n*92 extra leases (ignored) -# The struct module doc says that L's are 4 bytes in size., and that Q's are +# The struct module doc says that L's are 4 bytes in size, and that Q's are # 8 bytes in size. Since compatibility depends upon this, double-check it. assert struct.calcsize(">L") == 4, struct.calcsize(">L") assert struct.calcsize(">Q") == 8, struct.calcsize(">Q") -class MutableShareFile: + +class MutableDiskShare(object): + implements(IMutableShare) sharetype = "mutable" DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s") - HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases + EXTRA_LEASE_COUNT_OFFSET = DATA_LENGTH_OFFSET + 8 + HEADER = ">32s20s32sQQ" + HEADER_SIZE = struct.calcsize(HEADER) # doesn't include leases LEASE_SIZE = struct.calcsize(">LL32s32s20s") - assert LEASE_SIZE == 92 + assert LEASE_SIZE == 92, LEASE_SIZE DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE assert DATA_OFFSET == 468, DATA_OFFSET - # our sharefiles share with a recognizable string, plus some random - # binary data to reduce the chance that a regular text file will look - # like a sharefile. - MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e" + + MAGIC = MUTABLE_MAGIC assert len(MAGIC) == 32 MAX_SIZE = MAX_MUTABLE_SHARE_SIZE - # TODO: decide upon a policy for max share size - def __init__(self, filename, parent=None): - self.home = filename - if os.path.exists(self.home): + def __init__(self, home, storage_index, shnum, parent=None): + """ + Clients should use the load_mutable_disk_share and create_mutable_disk_share + factory functions rather than creating instances directly. + """ + self._storage_index = storage_index + self._shnum = shnum + self._home = home + if os.path.exists(self._home): # we don't cache anything, just check the magic - f = open(self.home, 'rb') - data = f.read(self.HEADER_SIZE) - (magic, - write_enabler_nodeid, write_enabler, - data_length, extra_lease_offset) = \ - struct.unpack(">32s20s32sQQ", data) - if magic != self.MAGIC: - msg = "sharefile %s had magic '%r' but we wanted '%r'" % \ - (filename, magic, self.MAGIC) - raise UnknownMutableContainerVersionError(msg) + f = open(self._home, 'rb') + try: + data = f.read(self.HEADER_SIZE) + (magic, + _write_enabler_nodeid, _write_enabler, + _data_length, _extra_lease_count_offset) = struct.unpack(self.HEADER, data) + if magic != self.MAGIC: + msg = "sharefile %r had magic '%r' but we wanted '%r'" % \ + (self._home, magic, self.MAGIC) + raise UnknownMutableContainerVersionError(msg) + finally: + f.close() self.parent = parent # for logging def log(self, *args, **kwargs): - return self.parent.log(*args, **kwargs) + if self.parent: + return self.parent.log(*args, **kwargs) - def create(self, my_nodeid, write_enabler): - assert not os.path.exists(self.home) + def create(self, serverid, write_enabler): + _assert(not os.path.exists(self._home), "%r already exists and should not" % (self._home,)) data_length = 0 - extra_lease_offset = (self.HEADER_SIZE - + 4 * self.LEASE_SIZE - + data_length) - assert extra_lease_offset == self.DATA_OFFSET # true at creation + extra_lease_count_offset = (self.HEADER_SIZE + + 4 * self.LEASE_SIZE + + data_length) + assert extra_lease_count_offset == self.DATA_OFFSET # true at creation num_extra_leases = 0 - f = open(self.home, 'wb') - header = struct.pack(">32s20s32sQQ", - self.MAGIC, my_nodeid, write_enabler, - data_length, extra_lease_offset, - ) - leases = ("\x00"*self.LEASE_SIZE) * 4 - f.write(header + leases) - # data goes here, empty after creation - f.write(struct.pack(">L", num_extra_leases)) - # extra leases go here, none at creation - f.close() + f = open(self._home, 'wb') + try: + header = struct.pack(self.HEADER, + self.MAGIC, serverid, write_enabler, + data_length, extra_lease_count_offset, + ) + leases = ("\x00"*self.LEASE_SIZE) * 4 + f.write(header + leases) + # data goes here, empty after creation + f.write(struct.pack(">L", num_extra_leases)) + # extra leases go here, none at creation + finally: + f.close() + return self + + def __repr__(self): + return ("" + % (si_b2a(self._storage_index or ""), self._shnum, self._home)) + + def get_size(self): + return os.stat(self._home).st_size + + def get_data_length(self): + f = open(self._home, 'rb') + try: + data_length = self._read_data_length(f) + finally: + f.close() + return data_length def get_used_space(self): - return get_used_space(self.home) + return fileutil.get_used_space(self._home) + + def get_storage_index(self): + return self._storage_index + + def get_storage_index_string(self): + return si_b2a(self._storage_index) + + def get_shnum(self): + return self._shnum def unlink(self): - os.unlink(self.home) + fileutil.remove(self._home) + return defer.succeed(None) - def get_size(self): - return os.stat(self.home).st_size + def _get_path(self): + return self._home def _read_data_length(self, f): f.seek(self.DATA_LENGTH_OFFSET) @@ -107,34 +144,36 @@ class MutableShareFile: return data_length def _read_container_size(self, f): - f.seek(self.DATA_LENGTH_OFFSET + 8) - (extra_lease_offset,) = struct.unpack(">Q", f.read(8)) - return extra_lease_offset - self.DATA_OFFSET + f.seek(self.EXTRA_LEASE_COUNT_OFFSET) + (extra_lease_count_offset,) = struct.unpack(">Q", f.read(8)) + return extra_lease_count_offset - self.DATA_OFFSET def _write_data_length(self, f, data_length): - extra_lease_offset = self.DATA_OFFSET + data_length + extra_lease_count_offset = self.DATA_OFFSET + data_length f.seek(self.DATA_LENGTH_OFFSET) - f.write(struct.pack(">QQ", data_length, extra_lease_offset)) - f.seek(extra_lease_offset) + f.write(struct.pack(">QQ", data_length, extra_lease_count_offset)) + f.seek(extra_lease_count_offset) f.write(struct.pack(">L", 0)) def _read_share_data(self, f, offset, length): - precondition(offset >= 0) + precondition(offset >= 0, offset=offset) data_length = self._read_data_length(f) - if offset+length > data_length: + if offset + length > data_length: # reads beyond the end of the data are truncated. Reads that # start beyond the end of the data return an empty string. - length = max(0, data_length-offset) + length = max(0, data_length - offset) if length == 0: return "" - precondition(offset+length <= data_length) + precondition(offset + length <= data_length) f.seek(self.DATA_OFFSET+offset) data = f.read(length) return data def _write_share_data(self, f, offset, data): length = len(data) - precondition(offset >= 0) + precondition(offset >= 0, offset=offset) + precondition(offset + length < self.MAX_SIZE, offset=offset, length=length) + data_length = self._read_data_length(f) if offset+length >= data_length: @@ -148,16 +187,16 @@ class MutableShareFile: # Fill any newly exposed empty space with 0's. if offset > data_length: - f.seek(self.DATA_OFFSET+data_length) + f.seek(self.DATA_OFFSET + data_length) f.write('\x00'*(offset - data_length)) f.flush() - new_data_length = offset+length + new_data_length = offset + length self._write_data_length(f, new_data_length) # an interrupt here will result in a corrupted share # now all that's left to do is write out their data - f.seek(self.DATA_OFFSET+offset) + f.seek(self.DATA_OFFSET + offset) f.write(data) return @@ -166,90 +205,84 @@ class MutableShareFile: data = f.read(self.HEADER_SIZE) (magic, write_enabler_nodeid, write_enabler, - data_length, extra_least_offset) = \ - struct.unpack(">32s20s32sQQ", data) + _data_length, _extra_lease_count_offset) = struct.unpack(self.HEADER, data) assert magic == self.MAGIC return (write_enabler, write_enabler_nodeid) def readv(self, readv): datav = [] - f = open(self.home, 'rb') - for (offset, length) in readv: - datav.append(self._read_share_data(f, offset, length)) - f.close() - return datav - - def check_write_enabler(self, write_enabler, si_s): - f = open(self.home, 'rb+') - (real_write_enabler, write_enabler_nodeid) = \ - self._read_write_enabler_and_nodeid(f) - f.close() + f = open(self._home, 'rb') + try: + for (offset, length) in readv: + datav.append(self._read_share_data(f, offset, length)) + finally: + f.close() + return defer.succeed(datav) + + def check_write_enabler(self, write_enabler): + f = open(self._home, 'rb+') + try: + (real_write_enabler, write_enabler_nodeid) = self._read_write_enabler_and_nodeid(f) + finally: + f.close() # avoid a timing attack - #if write_enabler != real_write_enabler: if not timing_safe_compare(write_enabler, real_write_enabler): # accomodate share migration by reporting the nodeid used for the # old write enabler. - self.log(format="bad write enabler on SI %(si)s," - " recorded by nodeid %(nodeid)s", - facility="tahoe.storage", - level=log.WEIRD, umid="cE1eBQ", - si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid)) - msg = "The write enabler was recorded by nodeid '%s'." % \ - (idlib.nodeid_b2a(write_enabler_nodeid),) - raise BadWriteEnablerError(msg) + def _bad_write_enabler(): + nodeid_s = idlib.nodeid_b2a(write_enabler_nodeid) + self.log(format="bad write enabler on SI %(si)s," + " recorded by nodeid %(nodeid)s", + facility="tahoe.storage", + level=log.WEIRD, umid="cE1eBQ", + si=self.get_storage_index_string(), + nodeid=nodeid_s) + raise BadWriteEnablerError("The write enabler was recorded by nodeid '%s'." + % (nodeid_s,)) + return defer.execute(_bad_write_enabler) + return defer.succeed(None) def check_testv(self, testv): test_good = True - f = open(self.home, 'rb+') - for (offset, length, operator, specimen) in testv: - data = self._read_share_data(f, offset, length) - if not testv_compare(data, operator, specimen): - test_good = False - break - f.close() - return test_good + f = open(self._home, 'rb+') + try: + for (offset, length, operator, specimen) in testv: + data = self._read_share_data(f, offset, length) + if not testv_compare(data, operator, specimen): + test_good = False + break + finally: + f.close() + return defer.succeed(test_good) def writev(self, datav, new_length): - f = open(self.home, 'rb+') - for (offset, data) in datav: - self._write_share_data(f, offset, data) - if new_length is not None: - cur_length = self._read_data_length(f) - if new_length < cur_length: - self._write_data_length(f, new_length) - # TODO: shrink the share file. - f.close() - -def testv_compare(a, op, b): - assert op in ("lt", "le", "eq", "ne", "ge", "gt") - if op == "lt": - return a < b - if op == "le": - return a <= b - if op == "eq": - return a == b - if op == "ne": - return a != b - if op == "ge": - return a >= b - if op == "gt": - return a > b - # never reached - -class EmptyShare: + precondition(new_length is None or new_length >= 0, new_length=new_length) - def check_testv(self, testv): - test_good = True - for (offset, length, operator, specimen) in testv: - data = "" - if not testv_compare(data, operator, specimen): - test_good = False - break - return test_good - -def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent): - ms = MutableShareFile(filename, parent) - ms.create(my_nodeid, write_enabler) - del ms - return MutableShareFile(filename, parent) + for (offset, data) in datav: + precondition(offset >= 0, offset=offset) + if offset + len(data) > self.MAX_SIZE: + raise DataTooLargeError() + f = open(self._home, 'rb+') + try: + for (offset, data) in datav: + self._write_share_data(f, offset, data) + if new_length is not None: + cur_length = self._read_data_length(f) + if new_length < cur_length: + self._write_data_length(f, new_length) + # TODO: shrink the share file. + finally: + f.close() + return defer.succeed(None) + + def close(self): + return defer.succeed(None) + + +def load_mutable_disk_share(home, storage_index=None, shnum=None, parent=None): + return MutableDiskShare(home, storage_index, shnum, parent) + +def create_mutable_disk_share(home, serverid, write_enabler, storage_index=None, shnum=None, parent=None): + ms = MutableDiskShare(home, storage_index, shnum, parent) + return ms.create(serverid, write_enabler) diff --git a/src/allmydata/storage/bucket.py b/src/allmydata/storage/bucket.py index 1cf99cbf..e63ad462 100644 --- a/src/allmydata/storage/bucket.py +++ b/src/allmydata/storage/bucket.py @@ -1,146 +1,131 @@ -import os, time +import time from foolscap.api import Referenceable +from twisted.internet import defer from zope.interface import implements from allmydata.interfaces import RIBucketWriter, RIBucketReader -from allmydata.util import base32, fileutil, log -from allmydata.util.fileutil import get_used_space +from allmydata.util import base32, log from allmydata.util.assertutil import precondition -from allmydata.storage.backends.disk.immutable import ShareFile from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE - class BucketWriter(Referenceable): implements(RIBucketWriter) - def __init__(self, ss, account, storage_index, shnum, - incominghome, finalhome, max_size, canary): - self.ss = ss - self.incominghome = incominghome - self.finalhome = finalhome - self._max_size = max_size # don't allow the client to write more than this + def __init__(self, account, share, canary): + self.ss = account.server self._account = account - self._storage_index = storage_index - self._shnum = shnum + self._share = share self._canary = canary self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) self.closed = False self.throw_out_all_data = False - self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) - self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE) + + self._account.add_share(share.get_storage_index(), share.get_shnum(), + share.get_allocated_data_length(), SHARETYPE_IMMUTABLE) def allocated_size(self): - return self._max_size + return self._share.get_allocated_data_length() + + def _add_latency(self, res, name, start): + self.ss.add_latency(name, time.time() - start) + self.ss.count(name) + return res def remote_write(self, offset, data): start = time.time() precondition(not self.closed) if self.throw_out_all_data: - return - self._sharefile.write_share_data(offset, data) - self.ss.add_latency("write", time.time() - start) - self.ss.count("write") + return defer.succeed(None) + d = self._share.write_share_data(offset, data) + d.addBoth(self._add_latency, "write", start) + return d def remote_close(self): precondition(not self.closed) start = time.time() - fileutil.make_dirs(os.path.dirname(self.finalhome)) - fileutil.rename(self.incominghome, self.finalhome) - try: - # self.incominghome is like storage/shares/incoming/ab/abcde/4 . - # We try to delete the parent (.../ab/abcde) to avoid leaving - # these directories lying around forever, but the delete might - # fail if we're working on another share for the same storage - # index (like ab/abcde/5). The alternative approach would be to - # use a hierarchy of objects (PrefixHolder, BucketHolder, - # ShareWriter), each of which is responsible for a single - # directory on disk, and have them use reference counting of - # their children to know when they should do the rmdir. This - # approach is simpler, but relies on os.rmdir refusing to delete - # a non-empty directory. Do *not* use fileutil.rm_dir() here! - os.rmdir(os.path.dirname(self.incominghome)) - # we also delete the grandparent (prefix) directory, .../ab , - # again to avoid leaving directories lying around. This might - # fail if there is another bucket open that shares a prefix (like - # ab/abfff). - os.rmdir(os.path.dirname(os.path.dirname(self.incominghome))) - # we leave the great-grandparent (incoming/) directory in place. - except EnvironmentError: - # ignore the "can't rmdir because the directory is not empty" - # exceptions, those are normal consequences of the - # above-mentioned conditions. - pass - self._sharefile = None - self.closed = True - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) - - filelen = get_used_space(self.finalhome) - self.ss.bucket_writer_closed(self, filelen) - self._account.add_or_renew_default_lease(self._storage_index, self._shnum) - self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen) - self.ss.add_latency("close", time.time() - start) - self.ss.count("close") + d = defer.succeed(None) + d.addCallback(lambda ign: self._share.close()) + d.addCallback(lambda ign: self._share.get_used_space()) + def _got_used_space(used_space): + storage_index = self._share.get_storage_index() + shnum = self._share.get_shnum() + self._share = None + self.closed = True + self._canary.dontNotifyOnDisconnect(self._disconnect_marker) + + self.ss.bucket_writer_closed(self, used_space) + self._account.add_or_renew_default_lease(storage_index, shnum) + self._account.mark_share_as_stable(storage_index, shnum, used_space) + d.addCallback(_got_used_space) + d.addBoth(self._add_latency, "close", start) + return d def _disconnected(self): if not self.closed: - self._abort() + return self._abort() + return defer.succeed(None) def remote_abort(self): - log.msg("storage: aborting sharefile %s" % self.incominghome, + log.msg("storage: aborting write to share %r" % self._share, facility="tahoe.storage", level=log.UNUSUAL) if not self.closed: self._canary.dontNotifyOnDisconnect(self._disconnect_marker) - self._abort() - self.ss.count("abort") + d = self._abort() + def _count(ign): + self.ss.count("abort") + d.addBoth(_count) + return d def _abort(self): + d = defer.succeed(None) if self.closed: - return - - os.remove(self.incominghome) - # if we were the last share to be moved, remove the incoming/ - # directory that was our parent - parentdir = os.path.split(self.incominghome)[0] - if not os.listdir(parentdir): - os.rmdir(parentdir) - self._sharefile = None - self._account.remove_share_and_leases(self._storage_index, self._shnum) + return d + d.addCallback(lambda ign: self._share.unlink()) + def _unlinked(ign): + self._share = None - # We are now considered closed for further writing. We must tell - # the storage server about this so that it stops expecting us to - # use the space it allocated for us earlier. - self.closed = True - self.ss.bucket_writer_closed(self, 0) + # We are now considered closed for further writing. We must tell + # the storage server about this so that it stops expecting us to + # use the space it allocated for us earlier. + self.closed = True + self.ss.bucket_writer_closed(self, 0) + d.addCallback(_unlinked) + return d class BucketReader(Referenceable): implements(RIBucketReader) - def __init__(self, ss, sharefname, storage_index=None, shnum=None): - self.ss = ss - self._share_file = ShareFile(sharefname) - self.storage_index = storage_index - self.shnum = shnum + def __init__(self, account, share): + self.ss = account.server + self._account = account + self._share = share + self.storage_index = share.get_storage_index() + self.shnum = share.get_shnum() def __repr__(self): return "<%s %s %s>" % (self.__class__.__name__, base32.b2a_l(self.storage_index[:8], 60), self.shnum) + def _add_latency(self, res, name, start): + self.ss.add_latency(name, time.time() - start) + self.ss.count(name) + return res + def remote_read(self, offset, length): start = time.time() - data = self._share_file.read_share_data(offset, length) - self.ss.add_latency("read", time.time() - start) - self.ss.count("read") - return data + d = self._share.read_share_data(offset, length) + d.addBoth(self._add_latency, "read", start) + return d def remote_advise_corrupt_share(self, reason): - return self.ss.client_advise_corrupt_share("immutable", - self.storage_index, - self.shnum, - reason) + return self._account.remote_advise_corrupt_share("immutable", + self.storage_index, + self.shnum, + reason) diff --git a/src/allmydata/storage/shares.py b/src/allmydata/storage/shares.py deleted file mode 100644 index bcdf98c4..00000000 --- a/src/allmydata/storage/shares.py +++ /dev/null @@ -1,14 +0,0 @@ -#! /usr/bin/python - -from allmydata.storage.backends.disk.mutable import MutableShareFile -from allmydata.storage.backends.disk.immutable import ShareFile - -def get_share_file(filename): - f = open(filename, "rb") - prefix = f.read(32) - f.close() - if prefix == MutableShareFile.MAGIC: - return MutableShareFile(filename) - # otherwise assume it's immutable - return ShareFile(filename) - diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index fe4d2aa0..0d581755 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -17,7 +17,7 @@ from allmydata.check_results import CheckResults, CheckAndRepairResults, \ from allmydata.storage_client import StubServer from allmydata.mutable.layout import unpack_header from allmydata.mutable.publish import MutableData -from allmydata.storage.backends.disk.mutable import MutableShareFile +from allmydata.storage.backends.disk.mutable import MutableDiskShare from allmydata.util import hashutil, log, fileutil, pollmixin from allmydata.util.assertutil import precondition from allmydata.util.consumer import download_to_data @@ -1307,8 +1307,8 @@ def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False): def _corrupt_mutable_share_data(data, debug=False): prefix = data[:32] - assert prefix == MutableShareFile.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC) - data_offset = MutableShareFile.DATA_OFFSET + assert prefix == MutableDiskShare.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableDiskShare.MAGIC) + data_offset = MutableDiskShare.DATA_OFFSET sharetype = data[data_offset:data_offset+1] assert sharetype == "\x00", "non-SDMF mutable shares not supported" (version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,