'allmydata.mutable',
'allmydata.scripts',
'allmydata.storage',
+ 'allmydata.storage.backends',
+ 'allmydata.storage.backends.cloud',
+ 'allmydata.storage.backends.cloud.s3',
+ 'allmydata.storage.backends.disk',
+ 'allmydata.storage.backends.null',
'allmydata.test',
'allmydata.util',
'allmydata.web',
self['filename'] = argv_to_abspath(filename)
def dump_share(options):
- from allmydata.storage.mutable import MutableShareFile
+ from allmydata.storage.backends.disk.mutable import MutableShareFile
from allmydata.util.encodingutil import quote_output
out = options.stdout
return dump_immutable_share(options)
def dump_immutable_share(options):
- from allmydata.storage.immutable import ShareFile
+ from allmydata.storage.backends.disk.immutable import ShareFile
out = options.stdout
f = ShareFile(options['filename'])
def dump_mutable_share(options):
- from allmydata.storage.mutable import MutableShareFile
+ from allmydata.storage.backends.disk.mutable import MutableShareFile
from allmydata.util import base32, idlib
out = options.stdout
m = MutableShareFile(options['filename'])
def describe_share(abs_sharefile, si_s, shnum_s, now, out):
from allmydata import uri
- from allmydata.storage.mutable import MutableShareFile
- from allmydata.storage.immutable import ShareFile
+ from allmydata.storage.backends.disk.mutable import MutableShareFile
+ from allmydata.storage.backends.disk.immutable import ShareFile
from allmydata.mutable.layout import unpack_share
from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
def corrupt_share(options):
import random
- from allmydata.storage.mutable import MutableShareFile
- from allmydata.storage.immutable import ShareFile
+ from allmydata.storage.backends.disk.mutable import MutableShareFile
+ from allmydata.storage.backends.disk.immutable import ShareFile
from allmydata.mutable.layout import unpack_header
from allmydata.immutable.layout import ReadBucketProxy
out = options.stdout
--- /dev/null
+
+import os, struct, time
+
+from foolscap.api import Referenceable
+
+from zope.interface import implements
+from allmydata.interfaces import RIBucketWriter, RIBucketReader
+from allmydata.util import base32, fileutil, log
+from allmydata.util.fileutil import get_used_space
+from allmydata.util.assertutil import precondition
+from allmydata.storage.common import UnknownImmutableContainerVersionError, \
+ DataTooLargeError
+from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE
+
+
+# Each share file (in storage/shares/$SI/$SHNUM) contains share data that
+# can be accessed by RIBucketWriter.write and RIBucketReader.read .
+
+# The share file has the following layout:
+# 0x00: share file version number, four bytes, current version is 1
+# 0x04: share data length, four bytes big-endian # Footnote 1
+# 0x08: number of leases, four bytes big-endian = N # Footnote 2
+# 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
+# filesize - 72*N: leases (ignored). Each lease is 72 bytes.
+
+# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers.
+
+# Footnote 2: as of Tahoe v1.11.0 this field is not used by storage servers.
+# New shares will have a 0 here. Old shares will have whatever value was left
+# over when the server was upgraded. All lease information is now kept in the
+# leasedb.
+
+
+class ShareFile:
+ sharetype = "immutable"
+ LEASE_SIZE = struct.calcsize(">L32s32sL")
+ HEADER = ">LLL"
+ HEADER_SIZE = struct.calcsize(HEADER)
+ DATA_OFFSET = HEADER_SIZE
+
+ def __init__(self, filename, max_size=None, create=False):
+ """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
+ precondition((max_size is not None) or (not create), max_size, create)
+ self.home = filename
+ self._max_size = max_size
+ if create:
+ # touch the file, so later callers will see that we're working on
+ # it. Also construct the metadata.
+ assert not os.path.exists(self.home)
+ fileutil.make_dirs(os.path.dirname(self.home))
+ f = open(self.home, 'wb')
+ # The second field -- the four-byte share data length -- is no
+ # longer used as of Tahoe v1.3.0, but we continue to write it in
+ # there in case someone downgrades a storage server from >=
+ # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
+ # server to another, etc. We do saturation -- a share data length
+ # larger than 2**32-1 (what can fit into the field) is marked as
+ # the largest length that can fit into the field. That way, even
+ # if this does happen, the old < v1.3.0 server will still allow
+ # clients to read the first part of the share.
+ f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
+ f.close()
+ self._data_length = max_size
+ else:
+ f = open(self.home, 'rb')
+ try:
+ (version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE))
+ finally:
+ f.close()
+ if version != 1:
+ msg = "sharefile %s had version %d but we wanted 1" % \
+ (filename, version)
+ raise UnknownImmutableContainerVersionError(msg)
+
+ filesize = os.stat(self.home).st_size
+ self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE)
+
+ # TODO: raise a better exception.
+ assert self._data_length >= 0, self._data_length
+
+ def get_used_space(self):
+ return get_used_space(self.home)
+
+ def unlink(self):
+ os.unlink(self.home)
+
+ def get_size(self):
+ return os.stat(self.home).st_size
+
+ def read_share_data(self, offset, length):
+ precondition(offset >= 0)
+
+ # Reads beyond the end of the data are truncated. Reads that start
+ # beyond the end of the data return an empty string.
+ seekpos = self.DATA_OFFSET + offset
+ actuallength = max(0, min(length, self._data_length - offset))
+ if actuallength == 0:
+ return ""
+ f = open(self.home, 'rb')
+ try:
+ f.seek(seekpos)
+ return f.read(actuallength)
+ finally:
+ f.close()
+
+ def write_share_data(self, offset, data):
+ length = len(data)
+ precondition(offset >= 0, offset)
+ if self._max_size is not None and offset+length > self._max_size:
+ raise DataTooLargeError(self._max_size, offset, length)
+ f = open(self.home, 'rb+')
+ try:
+ real_offset = self.DATA_OFFSET + offset
+ f.seek(real_offset)
+ assert f.tell() == real_offset
+ f.write(data)
+ finally:
+ f.close()
+
+
+class BucketWriter(Referenceable):
+ implements(RIBucketWriter)
+
+ def __init__(self, ss, account, storage_index, shnum,
+ incominghome, finalhome, max_size, canary):
+ self.ss = ss
+ self.incominghome = incominghome
+ self.finalhome = finalhome
+ self._max_size = max_size # don't allow the client to write more than this
+ self._account = account
+ self._storage_index = storage_index
+ self._shnum = shnum
+ self._canary = canary
+ self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
+ self.closed = False
+ self.throw_out_all_data = False
+ self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
+ self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE)
+
+ def allocated_size(self):
+ return self._max_size
+
+ def remote_write(self, offset, data):
+ start = time.time()
+ precondition(not self.closed)
+ if self.throw_out_all_data:
+ return
+ self._sharefile.write_share_data(offset, data)
+ self.ss.add_latency("write", time.time() - start)
+ self.ss.count("write")
+
+ def remote_close(self):
+ precondition(not self.closed)
+ start = time.time()
+
+ fileutil.make_dirs(os.path.dirname(self.finalhome))
+ fileutil.rename(self.incominghome, self.finalhome)
+ try:
+ # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
+ # We try to delete the parent (.../ab/abcde) to avoid leaving
+ # these directories lying around forever, but the delete might
+ # fail if we're working on another share for the same storage
+ # index (like ab/abcde/5). The alternative approach would be to
+ # use a hierarchy of objects (PrefixHolder, BucketHolder,
+ # ShareWriter), each of which is responsible for a single
+ # directory on disk, and have them use reference counting of
+ # their children to know when they should do the rmdir. This
+ # approach is simpler, but relies on os.rmdir refusing to delete
+ # a non-empty directory. Do *not* use fileutil.rm_dir() here!
+ os.rmdir(os.path.dirname(self.incominghome))
+ # we also delete the grandparent (prefix) directory, .../ab ,
+ # again to avoid leaving directories lying around. This might
+ # fail if there is another bucket open that shares a prefix (like
+ # ab/abfff).
+ os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
+ # we leave the great-grandparent (incoming/) directory in place.
+ except EnvironmentError:
+ # ignore the "can't rmdir because the directory is not empty"
+ # exceptions, those are normal consequences of the
+ # above-mentioned conditions.
+ pass
+ self._sharefile = None
+ self.closed = True
+ self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+
+ filelen = get_used_space(self.finalhome)
+ self.ss.bucket_writer_closed(self, filelen)
+ self._account.add_or_renew_default_lease(self._storage_index, self._shnum)
+ self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen)
+ self.ss.add_latency("close", time.time() - start)
+ self.ss.count("close")
+
+ def _disconnected(self):
+ if not self.closed:
+ self._abort()
+
+ def remote_abort(self):
+ log.msg("storage: aborting sharefile %s" % self.incominghome,
+ facility="tahoe.storage", level=log.UNUSUAL)
+ if not self.closed:
+ self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+ self._abort()
+ self.ss.count("abort")
+
+ def _abort(self):
+ if self.closed:
+ return
+
+ os.remove(self.incominghome)
+ # if we were the last share to be moved, remove the incoming/
+ # directory that was our parent
+ parentdir = os.path.split(self.incominghome)[0]
+ if not os.listdir(parentdir):
+ os.rmdir(parentdir)
+ self._sharefile = None
+ self._account.remove_share_and_leases(self._storage_index, self._shnum)
+
+ # We are now considered closed for further writing. We must tell
+ # the storage server about this so that it stops expecting us to
+ # use the space it allocated for us earlier.
+ self.closed = True
+ self.ss.bucket_writer_closed(self, 0)
+
+
+class BucketReader(Referenceable):
+ implements(RIBucketReader)
+
+ def __init__(self, ss, sharefname, storage_index=None, shnum=None):
+ self.ss = ss
+ self._share_file = ShareFile(sharefname)
+ self.storage_index = storage_index
+ self.shnum = shnum
+
+ def __repr__(self):
+ return "<%s %s %s>" % (self.__class__.__name__,
+ base32.b2a_l(self.storage_index[:8], 60),
+ self.shnum)
+
+ def remote_read(self, offset, length):
+ start = time.time()
+ data = self._share_file.read_share_data(offset, length)
+ self.ss.add_latency("read", time.time() - start)
+ self.ss.count("read")
+ return data
+
+ def remote_advise_corrupt_share(self, reason):
+ return self.ss.client_advise_corrupt_share("immutable",
+ self.storage_index,
+ self.shnum,
+ reason)
--- /dev/null
+
+import os, struct
+
+from allmydata.interfaces import BadWriteEnablerError
+from allmydata.util import idlib, log
+from allmydata.util.assertutil import precondition
+from allmydata.util.hashutil import timing_safe_compare
+from allmydata.util.fileutil import get_used_space
+from allmydata.storage.common import UnknownMutableContainerVersionError, \
+ DataTooLargeError
+from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
+
+
+# the MutableShareFile is like the ShareFile, but used for mutable data. It
+# has a different layout. See docs/mutable.txt for more details.
+
+# # offset size name
+# 1 0 32 magic verstr "tahoe mutable container v1" plus binary
+# 2 32 20 write enabler's nodeid
+# 3 52 32 write enabler
+# 4 84 8 data size (actual share data present) (a)
+# 5 92 8 offset of (8) count of extra leases (after data)
+# 6 100 368 four leases, 92 bytes each
+# 0 4 ownerid (0 means "no lease here")
+# 4 4 expiration timestamp
+# 8 32 renewal token
+# 40 32 cancel token
+# 72 20 nodeid which accepted the tokens
+# 7 468 (a) data
+# 8 ?? 4 count of extra leases
+# 9 ?? n*92 extra leases
+
+
+# The struct module doc says that L's are 4 bytes in size., and that Q's are
+# 8 bytes in size. Since compatibility depends upon this, double-check it.
+assert struct.calcsize(">L") == 4, struct.calcsize(">L")
+assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
+
+class MutableShareFile:
+
+ sharetype = "mutable"
+ DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
+ HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
+ LEASE_SIZE = struct.calcsize(">LL32s32s20s")
+ assert LEASE_SIZE == 92
+ DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
+ assert DATA_OFFSET == 468, DATA_OFFSET
+ # our sharefiles share with a recognizable string, plus some random
+ # binary data to reduce the chance that a regular text file will look
+ # like a sharefile.
+ MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
+ assert len(MAGIC) == 32
+ MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
+ # TODO: decide upon a policy for max share size
+
+ def __init__(self, filename, parent=None):
+ self.home = filename
+ if os.path.exists(self.home):
+ # we don't cache anything, just check the magic
+ f = open(self.home, 'rb')
+ data = f.read(self.HEADER_SIZE)
+ (magic,
+ write_enabler_nodeid, write_enabler,
+ data_length, extra_lease_offset) = \
+ struct.unpack(">32s20s32sQQ", data)
+ if magic != self.MAGIC:
+ msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
+ (filename, magic, self.MAGIC)
+ raise UnknownMutableContainerVersionError(msg)
+ self.parent = parent # for logging
+
+ def log(self, *args, **kwargs):
+ return self.parent.log(*args, **kwargs)
+
+ def create(self, my_nodeid, write_enabler):
+ assert not os.path.exists(self.home)
+ data_length = 0
+ extra_lease_offset = (self.HEADER_SIZE
+ + 4 * self.LEASE_SIZE
+ + data_length)
+ assert extra_lease_offset == self.DATA_OFFSET # true at creation
+ num_extra_leases = 0
+ f = open(self.home, 'wb')
+ header = struct.pack(">32s20s32sQQ",
+ self.MAGIC, my_nodeid, write_enabler,
+ data_length, extra_lease_offset,
+ )
+ leases = ("\x00"*self.LEASE_SIZE) * 4
+ f.write(header + leases)
+ # data goes here, empty after creation
+ f.write(struct.pack(">L", num_extra_leases))
+ # extra leases go here, none at creation
+ f.close()
+
+ def get_used_space(self):
+ return get_used_space(self.home)
+
+ def unlink(self):
+ os.unlink(self.home)
+
+ def get_size(self):
+ return os.stat(self.home).st_size
+
+ def _read_data_length(self, f):
+ f.seek(self.DATA_LENGTH_OFFSET)
+ (data_length,) = struct.unpack(">Q", f.read(8))
+ return data_length
+
+ def _read_container_size(self, f):
+ f.seek(self.DATA_LENGTH_OFFSET + 8)
+ (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
+ return extra_lease_offset - self.DATA_OFFSET
+
+ def _write_data_length(self, f, data_length):
+ extra_lease_offset = self.DATA_OFFSET + data_length
+ f.seek(self.DATA_LENGTH_OFFSET)
+ f.write(struct.pack(">QQ", data_length, extra_lease_offset))
+ f.seek(extra_lease_offset)
+ f.write(struct.pack(">L", 0))
+
+ def _read_share_data(self, f, offset, length):
+ precondition(offset >= 0)
+ data_length = self._read_data_length(f)
+ if offset+length > data_length:
+ # reads beyond the end of the data are truncated. Reads that
+ # start beyond the end of the data return an empty string.
+ length = max(0, data_length-offset)
+ if length == 0:
+ return ""
+ precondition(offset+length <= data_length)
+ f.seek(self.DATA_OFFSET+offset)
+ data = f.read(length)
+ return data
+
+ def _write_share_data(self, f, offset, data):
+ length = len(data)
+ precondition(offset >= 0)
+ data_length = self._read_data_length(f)
+
+ if offset+length >= data_length:
+ # They are expanding their data size.
+
+ if offset+length > self.MAX_SIZE:
+ raise DataTooLargeError()
+
+ # Their data now fits in the current container. We must write
+ # their new data and modify the recorded data size.
+
+ # Fill any newly exposed empty space with 0's.
+ if offset > data_length:
+ f.seek(self.DATA_OFFSET+data_length)
+ f.write('\x00'*(offset - data_length))
+ f.flush()
+
+ new_data_length = offset+length
+ self._write_data_length(f, new_data_length)
+ # an interrupt here will result in a corrupted share
+
+ # now all that's left to do is write out their data
+ f.seek(self.DATA_OFFSET+offset)
+ f.write(data)
+ return
+
+ def _read_write_enabler_and_nodeid(self, f):
+ f.seek(0)
+ data = f.read(self.HEADER_SIZE)
+ (magic,
+ write_enabler_nodeid, write_enabler,
+ data_length, extra_least_offset) = \
+ struct.unpack(">32s20s32sQQ", data)
+ assert magic == self.MAGIC
+ return (write_enabler, write_enabler_nodeid)
+
+ def readv(self, readv):
+ datav = []
+ f = open(self.home, 'rb')
+ for (offset, length) in readv:
+ datav.append(self._read_share_data(f, offset, length))
+ f.close()
+ return datav
+
+ def check_write_enabler(self, write_enabler, si_s):
+ f = open(self.home, 'rb+')
+ (real_write_enabler, write_enabler_nodeid) = \
+ self._read_write_enabler_and_nodeid(f)
+ f.close()
+ # avoid a timing attack
+ #if write_enabler != real_write_enabler:
+ if not timing_safe_compare(write_enabler, real_write_enabler):
+ # accomodate share migration by reporting the nodeid used for the
+ # old write enabler.
+ self.log(format="bad write enabler on SI %(si)s,"
+ " recorded by nodeid %(nodeid)s",
+ facility="tahoe.storage",
+ level=log.WEIRD, umid="cE1eBQ",
+ si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
+ msg = "The write enabler was recorded by nodeid '%s'." % \
+ (idlib.nodeid_b2a(write_enabler_nodeid),)
+ raise BadWriteEnablerError(msg)
+
+ def check_testv(self, testv):
+ test_good = True
+ f = open(self.home, 'rb+')
+ for (offset, length, operator, specimen) in testv:
+ data = self._read_share_data(f, offset, length)
+ if not testv_compare(data, operator, specimen):
+ test_good = False
+ break
+ f.close()
+ return test_good
+
+ def writev(self, datav, new_length):
+ f = open(self.home, 'rb+')
+ for (offset, data) in datav:
+ self._write_share_data(f, offset, data)
+ if new_length is not None:
+ cur_length = self._read_data_length(f)
+ if new_length < cur_length:
+ self._write_data_length(f, new_length)
+ # TODO: shrink the share file.
+ f.close()
+
+def testv_compare(a, op, b):
+ assert op in ("lt", "le", "eq", "ne", "ge", "gt")
+ if op == "lt":
+ return a < b
+ if op == "le":
+ return a <= b
+ if op == "eq":
+ return a == b
+ if op == "ne":
+ return a != b
+ if op == "ge":
+ return a >= b
+ if op == "gt":
+ return a > b
+ # never reached
+
+class EmptyShare:
+
+ def check_testv(self, testv):
+ test_good = True
+ for (offset, length, operator, specimen) in testv:
+ data = ""
+ if not testv_compare(data, operator, specimen):
+ test_good = False
+ break
+ return test_good
+
+def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
+ ms = MutableShareFile(filename, parent)
+ ms.create(my_nodeid, write_enabler)
+ del ms
+ return MutableShareFile(filename, parent)
+
+++ /dev/null
-
-import os, struct, time
-
-from foolscap.api import Referenceable
-
-from zope.interface import implements
-from allmydata.interfaces import RIBucketWriter, RIBucketReader
-from allmydata.util import base32, fileutil, log
-from allmydata.util.fileutil import get_used_space
-from allmydata.util.assertutil import precondition
-from allmydata.storage.common import UnknownImmutableContainerVersionError, \
- DataTooLargeError
-from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE
-
-
-# Each share file (in storage/shares/$SI/$SHNUM) contains share data that
-# can be accessed by RIBucketWriter.write and RIBucketReader.read .
-
-# The share file has the following layout:
-# 0x00: share file version number, four bytes, current version is 1
-# 0x04: share data length, four bytes big-endian # Footnote 1
-# 0x08: number of leases, four bytes big-endian = N # Footnote 2
-# 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
-# filesize - 72*N: leases (ignored). Each lease is 72 bytes.
-
-# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers.
-
-# Footnote 2: as of Tahoe v1.11.0 this field is not used by storage servers.
-# New shares will have a 0 here. Old shares will have whatever value was left
-# over when the server was upgraded. All lease information is now kept in the
-# leasedb.
-
-
-class ShareFile:
- sharetype = "immutable"
- LEASE_SIZE = struct.calcsize(">L32s32sL")
- HEADER = ">LLL"
- HEADER_SIZE = struct.calcsize(HEADER)
- DATA_OFFSET = HEADER_SIZE
-
- def __init__(self, filename, max_size=None, create=False):
- """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
- precondition((max_size is not None) or (not create), max_size, create)
- self.home = filename
- self._max_size = max_size
- if create:
- # touch the file, so later callers will see that we're working on
- # it. Also construct the metadata.
- assert not os.path.exists(self.home)
- fileutil.make_dirs(os.path.dirname(self.home))
- f = open(self.home, 'wb')
- # The second field -- the four-byte share data length -- is no
- # longer used as of Tahoe v1.3.0, but we continue to write it in
- # there in case someone downgrades a storage server from >=
- # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
- # server to another, etc. We do saturation -- a share data length
- # larger than 2**32-1 (what can fit into the field) is marked as
- # the largest length that can fit into the field. That way, even
- # if this does happen, the old < v1.3.0 server will still allow
- # clients to read the first part of the share.
- f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
- f.close()
- self._data_length = max_size
- else:
- f = open(self.home, 'rb')
- try:
- (version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE))
- finally:
- f.close()
- if version != 1:
- msg = "sharefile %s had version %d but we wanted 1" % \
- (filename, version)
- raise UnknownImmutableContainerVersionError(msg)
-
- filesize = os.stat(self.home).st_size
- self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE)
-
- # TODO: raise a better exception.
- assert self._data_length >= 0, self._data_length
-
- def get_used_space(self):
- return get_used_space(self.home)
-
- def unlink(self):
- os.unlink(self.home)
-
- def get_size(self):
- return os.stat(self.home).st_size
-
- def read_share_data(self, offset, length):
- precondition(offset >= 0)
-
- # Reads beyond the end of the data are truncated. Reads that start
- # beyond the end of the data return an empty string.
- seekpos = self.DATA_OFFSET + offset
- actuallength = max(0, min(length, self._data_length - offset))
- if actuallength == 0:
- return ""
- f = open(self.home, 'rb')
- try:
- f.seek(seekpos)
- return f.read(actuallength)
- finally:
- f.close()
-
- def write_share_data(self, offset, data):
- length = len(data)
- precondition(offset >= 0, offset)
- if self._max_size is not None and offset+length > self._max_size:
- raise DataTooLargeError(self._max_size, offset, length)
- f = open(self.home, 'rb+')
- try:
- real_offset = self.DATA_OFFSET + offset
- f.seek(real_offset)
- assert f.tell() == real_offset
- f.write(data)
- finally:
- f.close()
-
-
-class BucketWriter(Referenceable):
- implements(RIBucketWriter)
-
- def __init__(self, ss, account, storage_index, shnum,
- incominghome, finalhome, max_size, canary):
- self.ss = ss
- self.incominghome = incominghome
- self.finalhome = finalhome
- self._max_size = max_size # don't allow the client to write more than this
- self._account = account
- self._storage_index = storage_index
- self._shnum = shnum
- self._canary = canary
- self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
- self.closed = False
- self.throw_out_all_data = False
- self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
- self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE)
-
- def allocated_size(self):
- return self._max_size
-
- def remote_write(self, offset, data):
- start = time.time()
- precondition(not self.closed)
- if self.throw_out_all_data:
- return
- self._sharefile.write_share_data(offset, data)
- self.ss.add_latency("write", time.time() - start)
- self.ss.count("write")
-
- def remote_close(self):
- precondition(not self.closed)
- start = time.time()
-
- fileutil.make_dirs(os.path.dirname(self.finalhome))
- fileutil.rename(self.incominghome, self.finalhome)
- try:
- # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
- # We try to delete the parent (.../ab/abcde) to avoid leaving
- # these directories lying around forever, but the delete might
- # fail if we're working on another share for the same storage
- # index (like ab/abcde/5). The alternative approach would be to
- # use a hierarchy of objects (PrefixHolder, BucketHolder,
- # ShareWriter), each of which is responsible for a single
- # directory on disk, and have them use reference counting of
- # their children to know when they should do the rmdir. This
- # approach is simpler, but relies on os.rmdir refusing to delete
- # a non-empty directory. Do *not* use fileutil.rm_dir() here!
- os.rmdir(os.path.dirname(self.incominghome))
- # we also delete the grandparent (prefix) directory, .../ab ,
- # again to avoid leaving directories lying around. This might
- # fail if there is another bucket open that shares a prefix (like
- # ab/abfff).
- os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
- # we leave the great-grandparent (incoming/) directory in place.
- except EnvironmentError:
- # ignore the "can't rmdir because the directory is not empty"
- # exceptions, those are normal consequences of the
- # above-mentioned conditions.
- pass
- self._sharefile = None
- self.closed = True
- self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-
- filelen = get_used_space(self.finalhome)
- self.ss.bucket_writer_closed(self, filelen)
- self._account.add_or_renew_default_lease(self._storage_index, self._shnum)
- self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen)
- self.ss.add_latency("close", time.time() - start)
- self.ss.count("close")
-
- def _disconnected(self):
- if not self.closed:
- self._abort()
-
- def remote_abort(self):
- log.msg("storage: aborting sharefile %s" % self.incominghome,
- facility="tahoe.storage", level=log.UNUSUAL)
- if not self.closed:
- self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
- self._abort()
- self.ss.count("abort")
-
- def _abort(self):
- if self.closed:
- return
-
- os.remove(self.incominghome)
- # if we were the last share to be moved, remove the incoming/
- # directory that was our parent
- parentdir = os.path.split(self.incominghome)[0]
- if not os.listdir(parentdir):
- os.rmdir(parentdir)
- self._sharefile = None
- self._account.remove_share_and_leases(self._storage_index, self._shnum)
-
- # We are now considered closed for further writing. We must tell
- # the storage server about this so that it stops expecting us to
- # use the space it allocated for us earlier.
- self.closed = True
- self.ss.bucket_writer_closed(self, 0)
-
-
-class BucketReader(Referenceable):
- implements(RIBucketReader)
-
- def __init__(self, ss, sharefname, storage_index=None, shnum=None):
- self.ss = ss
- self._share_file = ShareFile(sharefname)
- self.storage_index = storage_index
- self.shnum = shnum
-
- def __repr__(self):
- return "<%s %s %s>" % (self.__class__.__name__,
- base32.b2a_l(self.storage_index[:8], 60),
- self.shnum)
-
- def remote_read(self, offset, length):
- start = time.time()
- data = self._share_file.read_share_data(offset, length)
- self.ss.add_latency("read", time.time() - start)
- self.ss.count("read")
- return data
-
- def remote_advise_corrupt_share(self, reason):
- return self.ss.client_advise_corrupt_share("immutable",
- self.storage_index,
- self.shnum,
- reason)
+++ /dev/null
-
-import os, struct
-
-from allmydata.interfaces import BadWriteEnablerError
-from allmydata.util import idlib, log
-from allmydata.util.assertutil import precondition
-from allmydata.util.hashutil import timing_safe_compare
-from allmydata.util.fileutil import get_used_space
-from allmydata.storage.common import UnknownMutableContainerVersionError, \
- DataTooLargeError
-from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
-
-
-# the MutableShareFile is like the ShareFile, but used for mutable data. It
-# has a different layout. See docs/mutable.txt for more details.
-
-# # offset size name
-# 1 0 32 magic verstr "tahoe mutable container v1" plus binary
-# 2 32 20 write enabler's nodeid
-# 3 52 32 write enabler
-# 4 84 8 data size (actual share data present) (a)
-# 5 92 8 offset of (8) count of extra leases (after data)
-# 6 100 368 four leases, 92 bytes each
-# 0 4 ownerid (0 means "no lease here")
-# 4 4 expiration timestamp
-# 8 32 renewal token
-# 40 32 cancel token
-# 72 20 nodeid which accepted the tokens
-# 7 468 (a) data
-# 8 ?? 4 count of extra leases
-# 9 ?? n*92 extra leases
-
-
-# The struct module doc says that L's are 4 bytes in size., and that Q's are
-# 8 bytes in size. Since compatibility depends upon this, double-check it.
-assert struct.calcsize(">L") == 4, struct.calcsize(">L")
-assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
-
-class MutableShareFile:
-
- sharetype = "mutable"
- DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
- HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
- LEASE_SIZE = struct.calcsize(">LL32s32s20s")
- assert LEASE_SIZE == 92
- DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
- assert DATA_OFFSET == 468, DATA_OFFSET
- # our sharefiles share with a recognizable string, plus some random
- # binary data to reduce the chance that a regular text file will look
- # like a sharefile.
- MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
- assert len(MAGIC) == 32
- MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
- # TODO: decide upon a policy for max share size
-
- def __init__(self, filename, parent=None):
- self.home = filename
- if os.path.exists(self.home):
- # we don't cache anything, just check the magic
- f = open(self.home, 'rb')
- data = f.read(self.HEADER_SIZE)
- (magic,
- write_enabler_nodeid, write_enabler,
- data_length, extra_lease_offset) = \
- struct.unpack(">32s20s32sQQ", data)
- if magic != self.MAGIC:
- msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
- (filename, magic, self.MAGIC)
- raise UnknownMutableContainerVersionError(msg)
- self.parent = parent # for logging
-
- def log(self, *args, **kwargs):
- return self.parent.log(*args, **kwargs)
-
- def create(self, my_nodeid, write_enabler):
- assert not os.path.exists(self.home)
- data_length = 0
- extra_lease_offset = (self.HEADER_SIZE
- + 4 * self.LEASE_SIZE
- + data_length)
- assert extra_lease_offset == self.DATA_OFFSET # true at creation
- num_extra_leases = 0
- f = open(self.home, 'wb')
- header = struct.pack(">32s20s32sQQ",
- self.MAGIC, my_nodeid, write_enabler,
- data_length, extra_lease_offset,
- )
- leases = ("\x00"*self.LEASE_SIZE) * 4
- f.write(header + leases)
- # data goes here, empty after creation
- f.write(struct.pack(">L", num_extra_leases))
- # extra leases go here, none at creation
- f.close()
-
- def get_used_space(self):
- return get_used_space(self.home)
-
- def unlink(self):
- os.unlink(self.home)
-
- def get_size(self):
- return os.stat(self.home).st_size
-
- def _read_data_length(self, f):
- f.seek(self.DATA_LENGTH_OFFSET)
- (data_length,) = struct.unpack(">Q", f.read(8))
- return data_length
-
- def _read_container_size(self, f):
- f.seek(self.DATA_LENGTH_OFFSET + 8)
- (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
- return extra_lease_offset - self.DATA_OFFSET
-
- def _write_data_length(self, f, data_length):
- extra_lease_offset = self.DATA_OFFSET + data_length
- f.seek(self.DATA_LENGTH_OFFSET)
- f.write(struct.pack(">QQ", data_length, extra_lease_offset))
- f.seek(extra_lease_offset)
- f.write(struct.pack(">L", 0))
-
- def _read_share_data(self, f, offset, length):
- precondition(offset >= 0)
- data_length = self._read_data_length(f)
- if offset+length > data_length:
- # reads beyond the end of the data are truncated. Reads that
- # start beyond the end of the data return an empty string.
- length = max(0, data_length-offset)
- if length == 0:
- return ""
- precondition(offset+length <= data_length)
- f.seek(self.DATA_OFFSET+offset)
- data = f.read(length)
- return data
-
- def _write_share_data(self, f, offset, data):
- length = len(data)
- precondition(offset >= 0)
- data_length = self._read_data_length(f)
-
- if offset+length >= data_length:
- # They are expanding their data size.
-
- if offset+length > self.MAX_SIZE:
- raise DataTooLargeError()
-
- # Their data now fits in the current container. We must write
- # their new data and modify the recorded data size.
-
- # Fill any newly exposed empty space with 0's.
- if offset > data_length:
- f.seek(self.DATA_OFFSET+data_length)
- f.write('\x00'*(offset - data_length))
- f.flush()
-
- new_data_length = offset+length
- self._write_data_length(f, new_data_length)
- # an interrupt here will result in a corrupted share
-
- # now all that's left to do is write out their data
- f.seek(self.DATA_OFFSET+offset)
- f.write(data)
- return
-
- def _read_write_enabler_and_nodeid(self, f):
- f.seek(0)
- data = f.read(self.HEADER_SIZE)
- (magic,
- write_enabler_nodeid, write_enabler,
- data_length, extra_least_offset) = \
- struct.unpack(">32s20s32sQQ", data)
- assert magic == self.MAGIC
- return (write_enabler, write_enabler_nodeid)
-
- def readv(self, readv):
- datav = []
- f = open(self.home, 'rb')
- for (offset, length) in readv:
- datav.append(self._read_share_data(f, offset, length))
- f.close()
- return datav
-
- def check_write_enabler(self, write_enabler, si_s):
- f = open(self.home, 'rb+')
- (real_write_enabler, write_enabler_nodeid) = \
- self._read_write_enabler_and_nodeid(f)
- f.close()
- # avoid a timing attack
- #if write_enabler != real_write_enabler:
- if not timing_safe_compare(write_enabler, real_write_enabler):
- # accomodate share migration by reporting the nodeid used for the
- # old write enabler.
- self.log(format="bad write enabler on SI %(si)s,"
- " recorded by nodeid %(nodeid)s",
- facility="tahoe.storage",
- level=log.WEIRD, umid="cE1eBQ",
- si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
- msg = "The write enabler was recorded by nodeid '%s'." % \
- (idlib.nodeid_b2a(write_enabler_nodeid),)
- raise BadWriteEnablerError(msg)
-
- def check_testv(self, testv):
- test_good = True
- f = open(self.home, 'rb+')
- for (offset, length, operator, specimen) in testv:
- data = self._read_share_data(f, offset, length)
- if not testv_compare(data, operator, specimen):
- test_good = False
- break
- f.close()
- return test_good
-
- def writev(self, datav, new_length):
- f = open(self.home, 'rb+')
- for (offset, data) in datav:
- self._write_share_data(f, offset, data)
- if new_length is not None:
- cur_length = self._read_data_length(f)
- if new_length < cur_length:
- self._write_data_length(f, new_length)
- # TODO: shrink the share file.
- f.close()
-
-def testv_compare(a, op, b):
- assert op in ("lt", "le", "eq", "ne", "ge", "gt")
- if op == "lt":
- return a < b
- if op == "le":
- return a <= b
- if op == "eq":
- return a == b
- if op == "ne":
- return a != b
- if op == "ge":
- return a >= b
- if op == "gt":
- return a > b
- # never reached
-
-class EmptyShare:
-
- def check_testv(self, testv):
- test_good = True
- for (offset, length, operator, specimen) in testv:
- data = ""
- if not testv_compare(data, operator, specimen):
- test_good = False
- break
- return test_good
-
-def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
- ms = MutableShareFile(filename, parent)
- ms.create(my_nodeid, write_enabler)
- del ms
- return MutableShareFile(filename, parent)
-
from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
-from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
+from allmydata.storage.backends.disk.mutable import MutableShareFile, EmptyShare, \
create_mutable_sharefile
from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
-from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
+from allmydata.storage.backends.disk.immutable import ShareFile, BucketWriter, BucketReader
from allmydata.storage.crawler import BucketCountingCrawler
from allmydata.storage.accountant import Accountant
from allmydata.storage.expiration import ExpirationPolicy
#! /usr/bin/python
-from allmydata.storage.mutable import MutableShareFile
-from allmydata.storage.immutable import ShareFile
+from allmydata.storage.backends.disk.mutable import MutableShareFile
+from allmydata.storage.backends.disk.immutable import ShareFile
def get_share_file(filename):
f = open(filename, "rb")
from allmydata.storage_client import StubServer
from allmydata.mutable.layout import unpack_header
from allmydata.mutable.publish import MutableData
-from allmydata.storage.mutable import MutableShareFile
+from allmydata.storage.backends.disk.mutable import MutableShareFile
from allmydata.util import hashutil, log, fileutil, pollmixin
from allmydata.util.assertutil import precondition
from allmydata.util.consumer import download_to_data
from allmydata import interfaces
from allmydata.util import fileutil, hashutil, base32, time_format
from allmydata.storage.server import StorageServer
-from allmydata.storage.mutable import MutableShareFile
-from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
+from allmydata.storage.backends.disk.mutable import MutableShareFile
+from allmydata.storage.backends.disk.immutable import BucketWriter, BucketReader, ShareFile
from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE, SHARETYPE_MUTABLE
import allmydata
from allmydata import uri
-from allmydata.storage.mutable import MutableShareFile
+from allmydata.storage.backends.disk.mutable import MutableShareFile
from allmydata.storage.server import si_a2b
from allmydata.immutable import offloaded, upload
from allmydata.immutable.literal import LiteralFileNode