OFFSETS = ">LLLLQQ"
OFFSETS_LENGTH = struct.calcsize(OFFSETS)
+# our sharefiles share with a recognizable string, plus some random
+# binary data to reduce the chance that a regular text file will look
+# like a sharefile.
+MUTABLE_MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
+
MAX_MUTABLE_SHARE_SIZE = 69105*1000*1000*1000*1000 # 69105 TB, kind of arbitrary
-import os, struct
+import os, os.path, struct
+
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IShareForReading, IShareForWriting
from allmydata.util import fileutil
-from allmydata.util.fileutil import get_used_space
-from allmydata.util.assertutil import precondition
-from allmydata.storage.common import UnknownImmutableContainerVersionError, \
+from allmydata.util.assertutil import precondition, _assert
+from allmydata.storage.common import si_b2a, UnknownImmutableContainerVersionError, \
DataTooLargeError
-# Each share file (in storage/shares/$SI/$SHNUM) contains share data that
-# can be accessed by RIBucketWriter.write and RIBucketReader.read .
+# Each share file (in storage/shares/$PREFIX/$STORAGEINDEX/$SHNUM) contains
+# share data that can be accessed by RIBucketWriter.write and RIBucketReader.read .
# The share file has the following layout:
# 0x00: share file version number, four bytes, current version is 1
# over when the server was upgraded. All lease information is now kept in the
# leasedb.
+class ImmutableDiskShare(object):
+ implements(IShareForReading, IShareForWriting)
-class ShareFile:
sharetype = "immutable"
LEASE_SIZE = struct.calcsize(">L32s32sL")
HEADER = ">LLL"
HEADER_SIZE = struct.calcsize(HEADER)
DATA_OFFSET = HEADER_SIZE
- def __init__(self, filename, max_size=None, create=False):
- """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
- precondition((max_size is not None) or (not create), max_size, create)
- self.home = filename
- self._max_size = max_size
- if create:
- # touch the file, so later callers will see that we're working on
+ def __init__(self, home, storage_index, shnum, finalhome=None, allocated_data_length=None):
+ """
+ If allocated_data_length is not None then I won't allow more than allocated_data_length
+ to be written to me.
+ If finalhome is not None (meaning that we are creating the share) then allocated_data_length
+ must not be None.
+
+ Clients should use the load_immutable_disk_share and create_immutable_disk_share
+ factory functions rather than creating instances directly.
+ """
+ precondition((allocated_data_length is not None) or (finalhome is None),
+ allocated_data_length=allocated_data_length, finalhome=finalhome)
+ self._storage_index = storage_index
+ self._allocated_data_length = allocated_data_length
+
+ # If we are creating the share, _finalhome refers to the final path and
+ # _home to the incoming path. Otherwise, _finalhome is None.
+ self._finalhome = finalhome
+ self._home = home
+ self._shnum = shnum
+
+ if self._finalhome is not None:
+ # Touch the file, so later callers will see that we're working on
# it. Also construct the metadata.
- assert not os.path.exists(self.home)
- fileutil.make_dirs(os.path.dirname(self.home))
- f = open(self.home, 'wb')
+ _assert(not os.path.exists(self._finalhome), finalhome=self._finalhome)
+ fileutil.make_dirs(os.path.dirname(self._home))
# The second field -- the four-byte share data length -- is no
# longer used as of Tahoe v1.3.0, but we continue to write it in
# there in case someone downgrades a storage server from >=
# the largest length that can fit into the field. That way, even
# if this does happen, the old < v1.3.0 server will still allow
# clients to read the first part of the share.
- f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
- f.close()
- self._data_length = max_size
+ fileutil.write(self._home, struct.pack(self.HEADER, 1, min(2**32-1, allocated_data_length), 0))
+ self._data_length = allocated_data_length
else:
- f = open(self.home, 'rb')
+ f = open(self._home, 'rb')
try:
(version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE))
finally:
f.close()
if version != 1:
- msg = "sharefile %s had version %d but we wanted 1" % \
- (filename, version)
+ msg = "sharefile %r had version %d but we wanted 1" % (self._home, version)
raise UnknownImmutableContainerVersionError(msg)
- filesize = os.stat(self.home).st_size
+ filesize = os.stat(self._home).st_size
self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE)
# TODO: raise a better exception.
- assert self._data_length >= 0, self._data_length
+ _assert(self._data_length >= 0, data_length=self._data_length)
+
+ def __repr__(self):
+ return ("<ImmutableDiskShare %s:%r at %r>"
+ % (si_b2a(self._storage_index or ""), self._shnum, self._home))
+
+ def close(self):
+ fileutil.make_dirs(os.path.dirname(self._finalhome))
+ fileutil.move_into_place(self._home, self._finalhome)
+
+ # self._home is like storage/shares/incoming/ab/abcde/4 .
+ # We try to delete the parent (.../ab/abcde) to avoid leaving
+ # these directories lying around forever, but the delete might
+ # fail if we're working on another share for the same storage
+ # index (like ab/abcde/5). The alternative approach would be to
+ # use a hierarchy of objects (PrefixHolder, BucketHolder,
+ # ShareWriter), each of which is responsible for a single
+ # directory on disk, and have them use reference counting of
+ # their children to know when they should do the rmdir. This
+ # approach is simpler, but relies on os.rmdir (used by
+ # rmdir_if_empty) refusing to delete a non-empty directory.
+ # Do *not* use fileutil.remove() here!
+ parent = os.path.dirname(self._home)
+ fileutil.rmdir_if_empty(parent)
+
+ # we also delete the grandparent (prefix) directory, .../ab ,
+ # again to avoid leaving directories lying around. This might
+ # fail if there is another bucket open that shares a prefix (like
+ # ab/abfff).
+ fileutil.rmdir_if_empty(os.path.dirname(parent))
+
+ # we leave the great-grandparent (incoming/) directory in place.
+
+ self._home = self._finalhome
+ self._finalhome = None
+ return defer.succeed(None)
def get_used_space(self):
- return get_used_space(self.home)
+ return (fileutil.get_used_space(self._finalhome) +
+ fileutil.get_used_space(self._home))
+
+ def get_storage_index(self):
+ return self._storage_index
+
+ def get_storage_index_string(self):
+ return si_b2a(self._storage_index)
+
+ def get_shnum(self):
+ return self._shnum
def unlink(self):
- os.unlink(self.home)
+ fileutil.remove(self._home)
+ return defer.succeed(None)
+
+ def get_allocated_data_length(self):
+ return self._allocated_data_length
def get_size(self):
- return os.stat(self.home).st_size
+ return os.stat(self._home).st_size
- def read_share_data(self, offset, length):
+ def get_data_length(self):
+ return self._data_length
+
+ def readv(self, readv):
+ datav = []
+ f = open(self._home, 'rb')
+ try:
+ for (offset, length) in readv:
+ datav.append(self._read_share_data(f, offset, length))
+ finally:
+ f.close()
+ return defer.succeed(datav)
+
+ def _get_path(self):
+ return self._home
+
+ def _read_share_data(self, f, offset, length):
precondition(offset >= 0)
# Reads beyond the end of the data are truncated. Reads that start
actuallength = max(0, min(length, self._data_length - offset))
if actuallength == 0:
return ""
- f = open(self.home, 'rb')
+ f.seek(seekpos)
+ return f.read(actuallength)
+
+ def read_share_data(self, offset, length):
+ f = open(self._home, 'rb')
try:
- f.seek(seekpos)
- return f.read(actuallength)
+ return defer.succeed(self._read_share_data(f, offset, length))
finally:
f.close()
def write_share_data(self, offset, data):
length = len(data)
precondition(offset >= 0, offset)
- if self._max_size is not None and offset+length > self._max_size:
- raise DataTooLargeError(self._max_size, offset, length)
- f = open(self.home, 'rb+')
+ if self._allocated_data_length is not None and offset+length > self._allocated_data_length:
+ raise DataTooLargeError(self._allocated_data_length, offset, length)
+ f = open(self._home, 'rb+')
try:
real_offset = self.DATA_OFFSET + offset
f.seek(real_offset)
- assert f.tell() == real_offset
+ _assert(f.tell() == real_offset)
f.write(data)
+ return defer.succeed(None)
finally:
f.close()
+
+
+def load_immutable_disk_share(home, storage_index=None, shnum=None):
+ return ImmutableDiskShare(home, storage_index=storage_index, shnum=shnum)
+
+def create_immutable_disk_share(home, finalhome, allocated_data_length, storage_index=None, shnum=None):
+ return ImmutableDiskShare(home, finalhome=finalhome, allocated_data_length=allocated_data_length,
+ storage_index=storage_index, shnum=shnum)
import os, struct
-from allmydata.interfaces import BadWriteEnablerError
-from allmydata.util import idlib, log
-from allmydata.util.assertutil import precondition
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import IMutableShare, BadWriteEnablerError
+
+from allmydata.util import fileutil, idlib, log
+from allmydata.util.assertutil import precondition, _assert
from allmydata.util.hashutil import timing_safe_compare
-from allmydata.util.fileutil import get_used_space
-from allmydata.storage.common import UnknownMutableContainerVersionError, \
+from allmydata.storage.common import si_b2a, UnknownMutableContainerVersionError, \
DataTooLargeError
-from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
+from allmydata.storage.backends.base import testv_compare
+from allmydata.mutable.layout import MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
-# the MutableShareFile is like the ShareFile, but used for mutable data. It
-# has a different layout. See docs/mutable.txt for more details.
+# MutableDiskShare is like ImmutableDiskShare, but used for mutable data.
+# Mutable shares have a different layout. See docs/mutable.rst for more details.
# # offset size name
# 1 0 32 magic verstr "tahoe mutable container v1" plus binary
# 3 52 32 write enabler
# 4 84 8 data size (actual share data present) (a)
# 5 92 8 offset of (8) count of extra leases (after data)
-# 6 100 368 four leases, 92 bytes each
-# 0 4 ownerid (0 means "no lease here")
-# 4 4 expiration timestamp
-# 8 32 renewal token
-# 40 32 cancel token
-# 72 20 nodeid which accepted the tokens
+# 6 100 368 four leases, 92 bytes each (ignored)
# 7 468 (a) data
# 8 ?? 4 count of extra leases
-# 9 ?? n*92 extra leases
+# 9 ?? n*92 extra leases (ignored)
-# The struct module doc says that L's are 4 bytes in size., and that Q's are
+# The struct module doc says that L's are 4 bytes in size, and that Q's are
# 8 bytes in size. Since compatibility depends upon this, double-check it.
assert struct.calcsize(">L") == 4, struct.calcsize(">L")
assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
-class MutableShareFile:
+
+class MutableDiskShare(object):
+ implements(IMutableShare)
sharetype = "mutable"
DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
- HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
+ EXTRA_LEASE_COUNT_OFFSET = DATA_LENGTH_OFFSET + 8
+ HEADER = ">32s20s32sQQ"
+ HEADER_SIZE = struct.calcsize(HEADER) # doesn't include leases
LEASE_SIZE = struct.calcsize(">LL32s32s20s")
- assert LEASE_SIZE == 92
+ assert LEASE_SIZE == 92, LEASE_SIZE
DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
assert DATA_OFFSET == 468, DATA_OFFSET
- # our sharefiles share with a recognizable string, plus some random
- # binary data to reduce the chance that a regular text file will look
- # like a sharefile.
- MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
+
+ MAGIC = MUTABLE_MAGIC
assert len(MAGIC) == 32
MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
- # TODO: decide upon a policy for max share size
- def __init__(self, filename, parent=None):
- self.home = filename
- if os.path.exists(self.home):
+ def __init__(self, home, storage_index, shnum, parent=None):
+ """
+ Clients should use the load_mutable_disk_share and create_mutable_disk_share
+ factory functions rather than creating instances directly.
+ """
+ self._storage_index = storage_index
+ self._shnum = shnum
+ self._home = home
+ if os.path.exists(self._home):
# we don't cache anything, just check the magic
- f = open(self.home, 'rb')
- data = f.read(self.HEADER_SIZE)
- (magic,
- write_enabler_nodeid, write_enabler,
- data_length, extra_lease_offset) = \
- struct.unpack(">32s20s32sQQ", data)
- if magic != self.MAGIC:
- msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
- (filename, magic, self.MAGIC)
- raise UnknownMutableContainerVersionError(msg)
+ f = open(self._home, 'rb')
+ try:
+ data = f.read(self.HEADER_SIZE)
+ (magic,
+ _write_enabler_nodeid, _write_enabler,
+ _data_length, _extra_lease_count_offset) = struct.unpack(self.HEADER, data)
+ if magic != self.MAGIC:
+ msg = "sharefile %r had magic '%r' but we wanted '%r'" % \
+ (self._home, magic, self.MAGIC)
+ raise UnknownMutableContainerVersionError(msg)
+ finally:
+ f.close()
self.parent = parent # for logging
def log(self, *args, **kwargs):
- return self.parent.log(*args, **kwargs)
+ if self.parent:
+ return self.parent.log(*args, **kwargs)
- def create(self, my_nodeid, write_enabler):
- assert not os.path.exists(self.home)
+ def create(self, serverid, write_enabler):
+ _assert(not os.path.exists(self._home), "%r already exists and should not" % (self._home,))
data_length = 0
- extra_lease_offset = (self.HEADER_SIZE
- + 4 * self.LEASE_SIZE
- + data_length)
- assert extra_lease_offset == self.DATA_OFFSET # true at creation
+ extra_lease_count_offset = (self.HEADER_SIZE
+ + 4 * self.LEASE_SIZE
+ + data_length)
+ assert extra_lease_count_offset == self.DATA_OFFSET # true at creation
num_extra_leases = 0
- f = open(self.home, 'wb')
- header = struct.pack(">32s20s32sQQ",
- self.MAGIC, my_nodeid, write_enabler,
- data_length, extra_lease_offset,
- )
- leases = ("\x00"*self.LEASE_SIZE) * 4
- f.write(header + leases)
- # data goes here, empty after creation
- f.write(struct.pack(">L", num_extra_leases))
- # extra leases go here, none at creation
- f.close()
+ f = open(self._home, 'wb')
+ try:
+ header = struct.pack(self.HEADER,
+ self.MAGIC, serverid, write_enabler,
+ data_length, extra_lease_count_offset,
+ )
+ leases = ("\x00"*self.LEASE_SIZE) * 4
+ f.write(header + leases)
+ # data goes here, empty after creation
+ f.write(struct.pack(">L", num_extra_leases))
+ # extra leases go here, none at creation
+ finally:
+ f.close()
+ return self
+
+ def __repr__(self):
+ return ("<MutableDiskShare %s:%r at %r>"
+ % (si_b2a(self._storage_index or ""), self._shnum, self._home))
+
+ def get_size(self):
+ return os.stat(self._home).st_size
+
+ def get_data_length(self):
+ f = open(self._home, 'rb')
+ try:
+ data_length = self._read_data_length(f)
+ finally:
+ f.close()
+ return data_length
def get_used_space(self):
- return get_used_space(self.home)
+ return fileutil.get_used_space(self._home)
+
+ def get_storage_index(self):
+ return self._storage_index
+
+ def get_storage_index_string(self):
+ return si_b2a(self._storage_index)
+
+ def get_shnum(self):
+ return self._shnum
def unlink(self):
- os.unlink(self.home)
+ fileutil.remove(self._home)
+ return defer.succeed(None)
- def get_size(self):
- return os.stat(self.home).st_size
+ def _get_path(self):
+ return self._home
def _read_data_length(self, f):
f.seek(self.DATA_LENGTH_OFFSET)
return data_length
def _read_container_size(self, f):
- f.seek(self.DATA_LENGTH_OFFSET + 8)
- (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
- return extra_lease_offset - self.DATA_OFFSET
+ f.seek(self.EXTRA_LEASE_COUNT_OFFSET)
+ (extra_lease_count_offset,) = struct.unpack(">Q", f.read(8))
+ return extra_lease_count_offset - self.DATA_OFFSET
def _write_data_length(self, f, data_length):
- extra_lease_offset = self.DATA_OFFSET + data_length
+ extra_lease_count_offset = self.DATA_OFFSET + data_length
f.seek(self.DATA_LENGTH_OFFSET)
- f.write(struct.pack(">QQ", data_length, extra_lease_offset))
- f.seek(extra_lease_offset)
+ f.write(struct.pack(">QQ", data_length, extra_lease_count_offset))
+ f.seek(extra_lease_count_offset)
f.write(struct.pack(">L", 0))
def _read_share_data(self, f, offset, length):
- precondition(offset >= 0)
+ precondition(offset >= 0, offset=offset)
data_length = self._read_data_length(f)
- if offset+length > data_length:
+ if offset + length > data_length:
# reads beyond the end of the data are truncated. Reads that
# start beyond the end of the data return an empty string.
- length = max(0, data_length-offset)
+ length = max(0, data_length - offset)
if length == 0:
return ""
- precondition(offset+length <= data_length)
+ precondition(offset + length <= data_length)
f.seek(self.DATA_OFFSET+offset)
data = f.read(length)
return data
def _write_share_data(self, f, offset, data):
length = len(data)
- precondition(offset >= 0)
+ precondition(offset >= 0, offset=offset)
+ precondition(offset + length < self.MAX_SIZE, offset=offset, length=length)
+
data_length = self._read_data_length(f)
if offset+length >= data_length:
# Fill any newly exposed empty space with 0's.
if offset > data_length:
- f.seek(self.DATA_OFFSET+data_length)
+ f.seek(self.DATA_OFFSET + data_length)
f.write('\x00'*(offset - data_length))
f.flush()
- new_data_length = offset+length
+ new_data_length = offset + length
self._write_data_length(f, new_data_length)
# an interrupt here will result in a corrupted share
# now all that's left to do is write out their data
- f.seek(self.DATA_OFFSET+offset)
+ f.seek(self.DATA_OFFSET + offset)
f.write(data)
return
data = f.read(self.HEADER_SIZE)
(magic,
write_enabler_nodeid, write_enabler,
- data_length, extra_least_offset) = \
- struct.unpack(">32s20s32sQQ", data)
+ _data_length, _extra_lease_count_offset) = struct.unpack(self.HEADER, data)
assert magic == self.MAGIC
return (write_enabler, write_enabler_nodeid)
def readv(self, readv):
datav = []
- f = open(self.home, 'rb')
- for (offset, length) in readv:
- datav.append(self._read_share_data(f, offset, length))
- f.close()
- return datav
-
- def check_write_enabler(self, write_enabler, si_s):
- f = open(self.home, 'rb+')
- (real_write_enabler, write_enabler_nodeid) = \
- self._read_write_enabler_and_nodeid(f)
- f.close()
+ f = open(self._home, 'rb')
+ try:
+ for (offset, length) in readv:
+ datav.append(self._read_share_data(f, offset, length))
+ finally:
+ f.close()
+ return defer.succeed(datav)
+
+ def check_write_enabler(self, write_enabler):
+ f = open(self._home, 'rb+')
+ try:
+ (real_write_enabler, write_enabler_nodeid) = self._read_write_enabler_and_nodeid(f)
+ finally:
+ f.close()
# avoid a timing attack
- #if write_enabler != real_write_enabler:
if not timing_safe_compare(write_enabler, real_write_enabler):
# accomodate share migration by reporting the nodeid used for the
# old write enabler.
- self.log(format="bad write enabler on SI %(si)s,"
- " recorded by nodeid %(nodeid)s",
- facility="tahoe.storage",
- level=log.WEIRD, umid="cE1eBQ",
- si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
- msg = "The write enabler was recorded by nodeid '%s'." % \
- (idlib.nodeid_b2a(write_enabler_nodeid),)
- raise BadWriteEnablerError(msg)
+ def _bad_write_enabler():
+ nodeid_s = idlib.nodeid_b2a(write_enabler_nodeid)
+ self.log(format="bad write enabler on SI %(si)s,"
+ " recorded by nodeid %(nodeid)s",
+ facility="tahoe.storage",
+ level=log.WEIRD, umid="cE1eBQ",
+ si=self.get_storage_index_string(),
+ nodeid=nodeid_s)
+ raise BadWriteEnablerError("The write enabler was recorded by nodeid '%s'."
+ % (nodeid_s,))
+ return defer.execute(_bad_write_enabler)
+ return defer.succeed(None)
def check_testv(self, testv):
test_good = True
- f = open(self.home, 'rb+')
- for (offset, length, operator, specimen) in testv:
- data = self._read_share_data(f, offset, length)
- if not testv_compare(data, operator, specimen):
- test_good = False
- break
- f.close()
- return test_good
+ f = open(self._home, 'rb+')
+ try:
+ for (offset, length, operator, specimen) in testv:
+ data = self._read_share_data(f, offset, length)
+ if not testv_compare(data, operator, specimen):
+ test_good = False
+ break
+ finally:
+ f.close()
+ return defer.succeed(test_good)
def writev(self, datav, new_length):
- f = open(self.home, 'rb+')
- for (offset, data) in datav:
- self._write_share_data(f, offset, data)
- if new_length is not None:
- cur_length = self._read_data_length(f)
- if new_length < cur_length:
- self._write_data_length(f, new_length)
- # TODO: shrink the share file.
- f.close()
-
-def testv_compare(a, op, b):
- assert op in ("lt", "le", "eq", "ne", "ge", "gt")
- if op == "lt":
- return a < b
- if op == "le":
- return a <= b
- if op == "eq":
- return a == b
- if op == "ne":
- return a != b
- if op == "ge":
- return a >= b
- if op == "gt":
- return a > b
- # never reached
-
-class EmptyShare:
+ precondition(new_length is None or new_length >= 0, new_length=new_length)
- def check_testv(self, testv):
- test_good = True
- for (offset, length, operator, specimen) in testv:
- data = ""
- if not testv_compare(data, operator, specimen):
- test_good = False
- break
- return test_good
-
-def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
- ms = MutableShareFile(filename, parent)
- ms.create(my_nodeid, write_enabler)
- del ms
- return MutableShareFile(filename, parent)
+ for (offset, data) in datav:
+ precondition(offset >= 0, offset=offset)
+ if offset + len(data) > self.MAX_SIZE:
+ raise DataTooLargeError()
+ f = open(self._home, 'rb+')
+ try:
+ for (offset, data) in datav:
+ self._write_share_data(f, offset, data)
+ if new_length is not None:
+ cur_length = self._read_data_length(f)
+ if new_length < cur_length:
+ self._write_data_length(f, new_length)
+ # TODO: shrink the share file.
+ finally:
+ f.close()
+ return defer.succeed(None)
+
+ def close(self):
+ return defer.succeed(None)
+
+
+def load_mutable_disk_share(home, storage_index=None, shnum=None, parent=None):
+ return MutableDiskShare(home, storage_index, shnum, parent)
+
+def create_mutable_disk_share(home, serverid, write_enabler, storage_index=None, shnum=None, parent=None):
+ ms = MutableDiskShare(home, storage_index, shnum, parent)
+ return ms.create(serverid, write_enabler)
-import os, time
+import time
from foolscap.api import Referenceable
+from twisted.internet import defer
from zope.interface import implements
from allmydata.interfaces import RIBucketWriter, RIBucketReader
-from allmydata.util import base32, fileutil, log
-from allmydata.util.fileutil import get_used_space
+from allmydata.util import base32, log
from allmydata.util.assertutil import precondition
-from allmydata.storage.backends.disk.immutable import ShareFile
from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE
-
class BucketWriter(Referenceable):
implements(RIBucketWriter)
- def __init__(self, ss, account, storage_index, shnum,
- incominghome, finalhome, max_size, canary):
- self.ss = ss
- self.incominghome = incominghome
- self.finalhome = finalhome
- self._max_size = max_size # don't allow the client to write more than this
+ def __init__(self, account, share, canary):
+ self.ss = account.server
self._account = account
- self._storage_index = storage_index
- self._shnum = shnum
+ self._share = share
self._canary = canary
self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
self.closed = False
self.throw_out_all_data = False
- self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
- self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE)
+
+ self._account.add_share(share.get_storage_index(), share.get_shnum(),
+ share.get_allocated_data_length(), SHARETYPE_IMMUTABLE)
def allocated_size(self):
- return self._max_size
+ return self._share.get_allocated_data_length()
+
+ def _add_latency(self, res, name, start):
+ self.ss.add_latency(name, time.time() - start)
+ self.ss.count(name)
+ return res
def remote_write(self, offset, data):
start = time.time()
precondition(not self.closed)
if self.throw_out_all_data:
- return
- self._sharefile.write_share_data(offset, data)
- self.ss.add_latency("write", time.time() - start)
- self.ss.count("write")
+ return defer.succeed(None)
+ d = self._share.write_share_data(offset, data)
+ d.addBoth(self._add_latency, "write", start)
+ return d
def remote_close(self):
precondition(not self.closed)
start = time.time()
- fileutil.make_dirs(os.path.dirname(self.finalhome))
- fileutil.rename(self.incominghome, self.finalhome)
- try:
- # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
- # We try to delete the parent (.../ab/abcde) to avoid leaving
- # these directories lying around forever, but the delete might
- # fail if we're working on another share for the same storage
- # index (like ab/abcde/5). The alternative approach would be to
- # use a hierarchy of objects (PrefixHolder, BucketHolder,
- # ShareWriter), each of which is responsible for a single
- # directory on disk, and have them use reference counting of
- # their children to know when they should do the rmdir. This
- # approach is simpler, but relies on os.rmdir refusing to delete
- # a non-empty directory. Do *not* use fileutil.rm_dir() here!
- os.rmdir(os.path.dirname(self.incominghome))
- # we also delete the grandparent (prefix) directory, .../ab ,
- # again to avoid leaving directories lying around. This might
- # fail if there is another bucket open that shares a prefix (like
- # ab/abfff).
- os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
- # we leave the great-grandparent (incoming/) directory in place.
- except EnvironmentError:
- # ignore the "can't rmdir because the directory is not empty"
- # exceptions, those are normal consequences of the
- # above-mentioned conditions.
- pass
- self._sharefile = None
- self.closed = True
- self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-
- filelen = get_used_space(self.finalhome)
- self.ss.bucket_writer_closed(self, filelen)
- self._account.add_or_renew_default_lease(self._storage_index, self._shnum)
- self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen)
- self.ss.add_latency("close", time.time() - start)
- self.ss.count("close")
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: self._share.close())
+ d.addCallback(lambda ign: self._share.get_used_space())
+ def _got_used_space(used_space):
+ storage_index = self._share.get_storage_index()
+ shnum = self._share.get_shnum()
+ self._share = None
+ self.closed = True
+ self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+
+ self.ss.bucket_writer_closed(self, used_space)
+ self._account.add_or_renew_default_lease(storage_index, shnum)
+ self._account.mark_share_as_stable(storage_index, shnum, used_space)
+ d.addCallback(_got_used_space)
+ d.addBoth(self._add_latency, "close", start)
+ return d
def _disconnected(self):
if not self.closed:
- self._abort()
+ return self._abort()
+ return defer.succeed(None)
def remote_abort(self):
- log.msg("storage: aborting sharefile %s" % self.incominghome,
+ log.msg("storage: aborting write to share %r" % self._share,
facility="tahoe.storage", level=log.UNUSUAL)
if not self.closed:
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
- self._abort()
- self.ss.count("abort")
+ d = self._abort()
+ def _count(ign):
+ self.ss.count("abort")
+ d.addBoth(_count)
+ return d
def _abort(self):
+ d = defer.succeed(None)
if self.closed:
- return
-
- os.remove(self.incominghome)
- # if we were the last share to be moved, remove the incoming/
- # directory that was our parent
- parentdir = os.path.split(self.incominghome)[0]
- if not os.listdir(parentdir):
- os.rmdir(parentdir)
- self._sharefile = None
- self._account.remove_share_and_leases(self._storage_index, self._shnum)
+ return d
+ d.addCallback(lambda ign: self._share.unlink())
+ def _unlinked(ign):
+ self._share = None
- # We are now considered closed for further writing. We must tell
- # the storage server about this so that it stops expecting us to
- # use the space it allocated for us earlier.
- self.closed = True
- self.ss.bucket_writer_closed(self, 0)
+ # We are now considered closed for further writing. We must tell
+ # the storage server about this so that it stops expecting us to
+ # use the space it allocated for us earlier.
+ self.closed = True
+ self.ss.bucket_writer_closed(self, 0)
+ d.addCallback(_unlinked)
+ return d
class BucketReader(Referenceable):
implements(RIBucketReader)
- def __init__(self, ss, sharefname, storage_index=None, shnum=None):
- self.ss = ss
- self._share_file = ShareFile(sharefname)
- self.storage_index = storage_index
- self.shnum = shnum
+ def __init__(self, account, share):
+ self.ss = account.server
+ self._account = account
+ self._share = share
+ self.storage_index = share.get_storage_index()
+ self.shnum = share.get_shnum()
def __repr__(self):
return "<%s %s %s>" % (self.__class__.__name__,
base32.b2a_l(self.storage_index[:8], 60),
self.shnum)
+ def _add_latency(self, res, name, start):
+ self.ss.add_latency(name, time.time() - start)
+ self.ss.count(name)
+ return res
+
def remote_read(self, offset, length):
start = time.time()
- data = self._share_file.read_share_data(offset, length)
- self.ss.add_latency("read", time.time() - start)
- self.ss.count("read")
- return data
+ d = self._share.read_share_data(offset, length)
+ d.addBoth(self._add_latency, "read", start)
+ return d
def remote_advise_corrupt_share(self, reason):
- return self.ss.client_advise_corrupt_share("immutable",
- self.storage_index,
- self.shnum,
- reason)
+ return self._account.remote_advise_corrupt_share("immutable",
+ self.storage_index,
+ self.shnum,
+ reason)
+++ /dev/null
-#! /usr/bin/python
-
-from allmydata.storage.backends.disk.mutable import MutableShareFile
-from allmydata.storage.backends.disk.immutable import ShareFile
-
-def get_share_file(filename):
- f = open(filename, "rb")
- prefix = f.read(32)
- f.close()
- if prefix == MutableShareFile.MAGIC:
- return MutableShareFile(filename)
- # otherwise assume it's immutable
- return ShareFile(filename)
-
from allmydata.storage_client import StubServer
from allmydata.mutable.layout import unpack_header
from allmydata.mutable.publish import MutableData
-from allmydata.storage.backends.disk.mutable import MutableShareFile
+from allmydata.storage.backends.disk.mutable import MutableDiskShare
from allmydata.util import hashutil, log, fileutil, pollmixin
from allmydata.util.assertutil import precondition
from allmydata.util.consumer import download_to_data
def _corrupt_mutable_share_data(data, debug=False):
prefix = data[:32]
- assert prefix == MutableShareFile.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
- data_offset = MutableShareFile.DATA_OFFSET
+ assert prefix == MutableDiskShare.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableDiskShare.MAGIC)
+ data_offset = MutableDiskShare.DATA_OFFSET
sharetype = data[data_offset:data_offset+1]
assert sharetype == "\x00", "non-SDMF mutable shares not supported"
(version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,