From: Daira Hopwood Date: Fri, 17 Apr 2015 18:04:40 +0000 (+0100) Subject: Main leasedb changes (rebased). X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/%22doc.html/README.win32?a=commitdiff_plain;h=60597da237f51f75068043bc6ef609c3e264681d;p=tahoe-lafs%2Ftahoe-lafs.git Main leasedb changes (rebased). Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 88530dcd..c2d1b3b5 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -9,6 +9,7 @@ from pycryptopp.publickey import rsa import allmydata from allmydata.storage.server import StorageServer +from allmydata.storage.expiration import ExpirationPolicy from allmydata import storage_client from allmydata.immutable.upload import Uploader from allmydata.immutable.offloaded import Helper @@ -256,6 +257,7 @@ class Client(node.Node, pollmixin.PollMixin): return seed.strip() def init_storage(self): + self.accountant = None # should we run a storage server (and publish it for others to use)? if not self.get_config("storage", "enabled", True, boolean=True): return @@ -294,25 +296,27 @@ class Client(node.Node, pollmixin.PollMixin): raise OldConfigOptionError("[storage]expire.immutable = False is no longer supported.") if not self.get_config("storage", "expire.mutable", True, boolean=True): raise OldConfigOptionError("[storage]expire.mutable = False is no longer supported.") - expiration_sharetypes = ('mutable', 'immutable') + + expiration_policy = ExpirationPolicy(enabled=expire, mode=mode, override_lease_duration=o_l_d, + cutoff_date=cutoff_date) ss = StorageServer(storedir, self.nodeid, reserved_space=reserved, readonly_storage=readonly, - stats_provider=self.stats_provider, - expiration_enabled=expire, - expiration_mode=mode, - expiration_override_lease_duration=o_l_d, - expiration_cutoff_date=cutoff_date, - expiration_sharetypes=expiration_sharetypes) + stats_provider=self.stats_provider) + self.storage_server = ss self.add_service(ss) + self.accountant = ss.get_accountant() + self.accountant.set_expiration_policy(expiration_policy) + d = self.when_tub_ready() # we can't do registerReference until the Tub is ready def _publish(res): - furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding()) - furl = self.tub.registerReference(ss, furlFile=furl_file) - ann = {"anonymous-storage-FURL": furl, + anonymous_account = self.accountant.get_anonymous_account() + anonymous_account_furlfile = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding()) + anonymous_account_furl = self.tub.registerReference(anonymous_account, furlFile=anonymous_account_furlfile) + ann = {"anonymous-storage-FURL": anonymous_account_furl, "permutation-seed-base32": self._init_permutation_seed(ss), } self.introducer_client.publish("storage", ann, self._node_key) @@ -320,6 +324,9 @@ class Client(node.Node, pollmixin.PollMixin): d.addErrback(log.err, facility="tahoe.init", level=log.BAD, umid="aLGBKw") + def get_accountant(self): + return self.accountant + def init_client(self): helper_furl = self.get_config("client", "helper.furl", None) if helper_furl in ("None", ""): diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index 34e6e545..54dc1df2 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -134,11 +134,11 @@ def dump_immutable_chk_share(f, out, options): if options['offsets']: print >>out print >>out, " Section Offsets:" - print >>out, "%20s: %s" % ("share data", f._data_offset) + print >>out, "%20s: %s" % ("share data", f.DATA_OFFSET) for k in ["data", "plaintext_hash_tree", "crypttext_hash_tree", "block_hashes", "share_hashes", "uri_extension"]: name = {"data": "block data"}.get(k,k) - offset = f._data_offset + offsets[k] + offset = f.DATA_OFFSET + offsets[k] print >>out, " %20s: %s (0x%x)" % (name, offset, offset) def format_expiration_time(expiration_time): @@ -160,8 +160,7 @@ def dump_mutable_share(options): f = open(options['filename'], "rb") WE, nodeid = m._read_write_enabler_and_nodeid(f) data_length = m._read_data_length(f) - extra_lease_offset = m._read_extra_lease_offset(f) - container_size = extra_lease_offset - m.DATA_OFFSET + container_size = m._read_container_size(f) share_type = "unknown" f.seek(m.DATA_OFFSET) @@ -876,8 +875,8 @@ def corrupt_share(options): f = ShareFile(fn) bp = ReadBucketProxy(None, None, '') offsets = bp._parse_offsets(f.read_share_data(0, 0x24)) - start = f._data_offset + offsets["data"] - end = f._data_offset + offsets["plaintext_hash_tree"] + start = f.DATA_OFFSET + offsets["data"] + end = f.DATA_OFFSET + offsets["plaintext_hash_tree"] flip_bit(start, end) diff --git a/src/allmydata/storage/common.py b/src/allmydata/storage/common.py index 865275bc..3a5e0dce 100644 --- a/src/allmydata/storage/common.py +++ b/src/allmydata/storage/common.py @@ -16,6 +16,10 @@ def si_b2a(storageindex): def si_a2b(ascii_storageindex): return base32.a2b(ascii_storageindex) +def storage_index_to_prefix(storageindex): + sia = si_b2a(storageindex) + return sia[:2] + def storage_index_to_dir(storageindex): sia = si_b2a(storageindex) return os.path.join(sia[:2], sia) diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index 280dff39..780b04a4 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -1,44 +1,42 @@ -import os, stat, struct, time + +import os, struct, time from foolscap.api import Referenceable from zope.interface import implements from allmydata.interfaces import RIBucketWriter, RIBucketReader from allmydata.util import base32, fileutil, log +from allmydata.util.fileutil import get_used_space from allmydata.util.assertutil import precondition -from allmydata.util.hashutil import timing_safe_compare -from allmydata.storage.lease import LeaseInfo from allmydata.storage.common import UnknownImmutableContainerVersionError, \ DataTooLargeError +from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE + -# each share file (in storage/shares/$SI/$SHNUM) contains lease information -# and share data. The share data is accessed by RIBucketWriter.write and -# RIBucketReader.read . The lease information is not accessible through these -# interfaces. +# Each share file (in storage/shares/$SI/$SHNUM) contains share data that +# can be accessed by RIBucketWriter.write and RIBucketReader.read . # The share file has the following layout: # 0x00: share file version number, four bytes, current version is 1 -# 0x04: share data length, four bytes big-endian = A # See Footnote 1 below. -# 0x08: number of leases, four bytes big-endian +# 0x04: share data length, four bytes big-endian # Footnote 1 +# 0x08: number of leases, four bytes big-endian = N # Footnote 2 # 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy) -# A+0x0c = B: first lease. Lease format is: -# B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner -# B+0x04: renew secret, 32 bytes (SHA256) -# B+0x24: cancel secret, 32 bytes (SHA256) -# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch -# B+0x48: next lease, or end of record - -# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers, -# but it is still filled in by storage servers in case the storage server -# software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the -# share file is moved from one storage server to another. The value stored in -# this field is truncated, so if the actual share data length is >= 2**32, -# then the value stored in this field will be the actual share data length -# modulo 2**32. +# filesize - 72*N: leases (ignored). Each lease is 72 bytes. + +# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers. + +# Footnote 2: as of Tahoe v1.11.0 this field is not used by storage servers. +# New shares will have a 0 here. Old shares will have whatever value was left +# over when the server was upgraded. All lease information is now kept in the +# leasedb. + class ShareFile: - LEASE_SIZE = struct.calcsize(">L32s32sL") sharetype = "immutable" + LEASE_SIZE = struct.calcsize(">L32s32sL") + HEADER = ">LLL" + HEADER_SIZE = struct.calcsize(HEADER) + DATA_OFFSET = HEADER_SIZE def __init__(self, filename, max_size=None, create=False): """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """ @@ -62,35 +60,48 @@ class ShareFile: # clients to read the first part of the share. f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0)) f.close() - self._lease_offset = max_size + 0x0c - self._num_leases = 0 + self._data_length = max_size else: f = open(self.home, 'rb') - filesize = os.path.getsize(self.home) - (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc)) - f.close() + try: + (version, unused, num_leases) = struct.unpack(self.HEADER, f.read(self.HEADER_SIZE)) + finally: + f.close() if version != 1: msg = "sharefile %s had version %d but we wanted 1" % \ (filename, version) raise UnknownImmutableContainerVersionError(msg) - self._num_leases = num_leases - self._lease_offset = filesize - (num_leases * self.LEASE_SIZE) - self._data_offset = 0xc + + filesize = os.stat(self.home).st_size + self._data_length = filesize - self.DATA_OFFSET - (num_leases * self.LEASE_SIZE) + + # TODO: raise a better exception. + assert self._data_length >= 0, self._data_length + + def get_used_space(self): + return get_used_space(self.home) def unlink(self): os.unlink(self.home) + def get_size(self): + return os.stat(self.home).st_size + def read_share_data(self, offset, length): precondition(offset >= 0) - # reads beyond the end of the data are truncated. Reads that start + + # Reads beyond the end of the data are truncated. Reads that start # beyond the end of the data return an empty string. - seekpos = self._data_offset+offset - actuallength = max(0, min(length, self._lease_offset-seekpos)) + seekpos = self.DATA_OFFSET + offset + actuallength = max(0, min(length, self._data_length - offset)) if actuallength == 0: return "" f = open(self.home, 'rb') - f.seek(seekpos) - return f.read(actuallength) + try: + f.seek(seekpos) + return f.read(actuallength) + finally: + f.close() def write_share_data(self, offset, data): length = len(data) @@ -98,118 +109,33 @@ class ShareFile: if self._max_size is not None and offset+length > self._max_size: raise DataTooLargeError(self._max_size, offset, length) f = open(self.home, 'rb+') - real_offset = self._data_offset+offset - f.seek(real_offset) - assert f.tell() == real_offset - f.write(data) - f.close() - - def _write_lease_record(self, f, lease_number, lease_info): - offset = self._lease_offset + lease_number * self.LEASE_SIZE - f.seek(offset) - assert f.tell() == offset - f.write(lease_info.to_immutable_data()) - - def _read_num_leases(self, f): - f.seek(0x08) - (num_leases,) = struct.unpack(">L", f.read(4)) - return num_leases - - def _write_num_leases(self, f, num_leases): - f.seek(0x08) - f.write(struct.pack(">L", num_leases)) - - def _truncate_leases(self, f, num_leases): - f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE) - - def get_leases(self): - """Yields a LeaseInfo instance for all leases.""" - f = open(self.home, 'rb') - (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc)) - f.seek(self._lease_offset) - for i in range(num_leases): - data = f.read(self.LEASE_SIZE) - if data: - yield LeaseInfo().from_immutable_data(data) - - def add_lease(self, lease_info): - f = open(self.home, 'rb+') - num_leases = self._read_num_leases(f) - self._write_lease_record(f, num_leases, lease_info) - self._write_num_leases(f, num_leases+1) - f.close() - - def renew_lease(self, renew_secret, new_expire_time): - for i,lease in enumerate(self.get_leases()): - if timing_safe_compare(lease.renew_secret, renew_secret): - # yup. See if we need to update the owner time. - if new_expire_time > lease.expiration_time: - # yes - lease.expiration_time = new_expire_time - f = open(self.home, 'rb+') - self._write_lease_record(f, i, lease) - f.close() - return - raise IndexError("unable to renew non-existent lease") - - def add_or_renew_lease(self, lease_info): try: - self.renew_lease(lease_info.renew_secret, - lease_info.expiration_time) - except IndexError: - self.add_lease(lease_info) - - - def cancel_lease(self, cancel_secret): - """Remove a lease with the given cancel_secret. If the last lease is - cancelled, the file will be removed. Return the number of bytes that - were freed (by truncating the list of leases, and possibly by - deleting the file. Raise IndexError if there was no lease with the - given cancel_secret. - """ - - leases = list(self.get_leases()) - num_leases_removed = 0 - for i,lease in enumerate(leases): - if timing_safe_compare(lease.cancel_secret, cancel_secret): - leases[i] = None - num_leases_removed += 1 - if not num_leases_removed: - raise IndexError("unable to find matching lease to cancel") - if num_leases_removed: - # pack and write out the remaining leases. We write these out in - # the same order as they were added, so that if we crash while - # doing this, we won't lose any non-cancelled leases. - leases = [l for l in leases if l] # remove the cancelled leases - f = open(self.home, 'rb+') - for i,lease in enumerate(leases): - self._write_lease_record(f, i, lease) - self._write_num_leases(f, len(leases)) - self._truncate_leases(f, len(leases)) + real_offset = self.DATA_OFFSET + offset + f.seek(real_offset) + assert f.tell() == real_offset + f.write(data) + finally: f.close() - space_freed = self.LEASE_SIZE * num_leases_removed - if not len(leases): - space_freed += os.stat(self.home)[stat.ST_SIZE] - self.unlink() - return space_freed class BucketWriter(Referenceable): implements(RIBucketWriter) - def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary): + def __init__(self, ss, account, storage_index, shnum, + incominghome, finalhome, max_size, canary): self.ss = ss self.incominghome = incominghome self.finalhome = finalhome self._max_size = max_size # don't allow the client to write more than this + self._account = account + self._storage_index = storage_index + self._shnum = shnum self._canary = canary self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) self.closed = False self.throw_out_all_data = False self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) - # also, add our lease to the file now, so that other ones can be - # added by simultaneous uploaders - self._sharefile.add_lease(lease_info) + self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE) def allocated_size(self): return self._max_size @@ -257,8 +183,10 @@ class BucketWriter(Referenceable): self.closed = True self._canary.dontNotifyOnDisconnect(self._disconnect_marker) - filelen = os.stat(self.finalhome)[stat.ST_SIZE] + filelen = get_used_space(self.finalhome) self.ss.bucket_writer_closed(self, filelen) + self._account.add_or_renew_default_lease(self._storage_index, self._shnum) + self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen) self.ss.add_latency("close", time.time() - start) self.ss.count("close") @@ -285,6 +213,7 @@ class BucketWriter(Referenceable): if not os.listdir(parentdir): os.rmdir(parentdir) self._sharefile = None + self._account.remove_share_and_leases(self._storage_index, self._shnum) # We are now considered closed for further writing. We must tell # the storage server about this so that it stops expecting us to @@ -315,7 +244,7 @@ class BucketReader(Referenceable): return data def remote_advise_corrupt_share(self, reason): - return self.ss.remote_advise_corrupt_share("immutable", + return self.ss.client_advise_corrupt_share("immutable", self.storage_index, self.shnum, reason) diff --git a/src/allmydata/storage/mutable.py b/src/allmydata/storage/mutable.py index a8392eac..4d834958 100644 --- a/src/allmydata/storage/mutable.py +++ b/src/allmydata/storage/mutable.py @@ -1,10 +1,11 @@ -import os, stat, struct + +import os, struct from allmydata.interfaces import BadWriteEnablerError from allmydata.util import idlib, log from allmydata.util.assertutil import precondition from allmydata.util.hashutil import timing_safe_compare -from allmydata.storage.lease import LeaseInfo +from allmydata.util.fileutil import get_used_space from allmydata.storage.common import UnknownMutableContainerVersionError, \ DataTooLargeError from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE @@ -39,7 +40,6 @@ class MutableShareFile: sharetype = "mutable" DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s") - EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8 HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases LEASE_SIZE = struct.calcsize(">LL32s32s20s") assert LEASE_SIZE == 92 @@ -61,7 +61,7 @@ class MutableShareFile: data = f.read(self.HEADER_SIZE) (magic, write_enabler_nodeid, write_enabler, - data_length, extra_least_offset) = \ + data_length, extra_lease_offset) = \ struct.unpack(">32s20s32sQQ", data) if magic != self.MAGIC: msg = "sharefile %s had magic '%r' but we wanted '%r'" % \ @@ -92,17 +92,31 @@ class MutableShareFile: # extra leases go here, none at creation f.close() + def get_used_space(self): + return get_used_space(self.home) + def unlink(self): os.unlink(self.home) + def get_size(self): + return os.stat(self.home).st_size + def _read_data_length(self, f): f.seek(self.DATA_LENGTH_OFFSET) (data_length,) = struct.unpack(">Q", f.read(8)) return data_length + def _read_container_size(self, f): + f.seek(self.DATA_LENGTH_OFFSET + 8) + (extra_lease_offset,) = struct.unpack(">Q", f.read(8)) + return extra_lease_offset - self.DATA_OFFSET + def _write_data_length(self, f, data_length): + extra_lease_offset = self.DATA_OFFSET + data_length f.seek(self.DATA_LENGTH_OFFSET) - f.write(struct.pack(">Q", data_length)) + f.write(struct.pack(">QQ", data_length, extra_lease_offset)) + f.seek(extra_lease_offset) + f.write(struct.pack(">L", 0)) def _read_share_data(self, f, offset, length): precondition(offset >= 0) @@ -118,75 +132,17 @@ class MutableShareFile: data = f.read(length) return data - def _read_extra_lease_offset(self, f): - f.seek(self.EXTRA_LEASE_OFFSET) - (extra_lease_offset,) = struct.unpack(">Q", f.read(8)) - return extra_lease_offset - - def _write_extra_lease_offset(self, f, offset): - f.seek(self.EXTRA_LEASE_OFFSET) - f.write(struct.pack(">Q", offset)) - - def _read_num_extra_leases(self, f): - offset = self._read_extra_lease_offset(f) - f.seek(offset) - (num_extra_leases,) = struct.unpack(">L", f.read(4)) - return num_extra_leases - - def _write_num_extra_leases(self, f, num_leases): - extra_lease_offset = self._read_extra_lease_offset(f) - f.seek(extra_lease_offset) - f.write(struct.pack(">L", num_leases)) - - def _change_container_size(self, f, new_container_size): - if new_container_size > self.MAX_SIZE: - raise DataTooLargeError() - old_extra_lease_offset = self._read_extra_lease_offset(f) - new_extra_lease_offset = self.DATA_OFFSET + new_container_size - if new_extra_lease_offset < old_extra_lease_offset: - # TODO: allow containers to shrink. For now they remain large. - return - num_extra_leases = self._read_num_extra_leases(f) - f.seek(old_extra_lease_offset) - leases_size = 4 + num_extra_leases * self.LEASE_SIZE - extra_lease_data = f.read(leases_size) - - # Zero out the old lease info (in order to minimize the chance that - # it could accidentally be exposed to a reader later, re #1528). - f.seek(old_extra_lease_offset) - f.write('\x00' * leases_size) - f.flush() - - # An interrupt here will corrupt the leases. - - f.seek(new_extra_lease_offset) - f.write(extra_lease_data) - self._write_extra_lease_offset(f, new_extra_lease_offset) - def _write_share_data(self, f, offset, data): length = len(data) precondition(offset >= 0) data_length = self._read_data_length(f) - extra_lease_offset = self._read_extra_lease_offset(f) if offset+length >= data_length: # They are expanding their data size. - if self.DATA_OFFSET+offset+length > extra_lease_offset: - # TODO: allow containers to shrink. For now, they remain - # large. - - # Their new data won't fit in the current container, so we - # have to move the leases. With luck, they're expanding it - # more than the size of the extra lease block, which will - # minimize the corrupt-the-share window - self._change_container_size(f, offset+length) - extra_lease_offset = self._read_extra_lease_offset(f) + if offset+length > self.MAX_SIZE: + raise DataTooLargeError() - # an interrupt here is ok.. the container has been enlarged - # but the data remains untouched - - assert self.DATA_OFFSET+offset+length <= extra_lease_offset # Their data now fits in the current container. We must write # their new data and modify the recorded data size. @@ -205,161 +161,6 @@ class MutableShareFile: f.write(data) return - def _write_lease_record(self, f, lease_number, lease_info): - extra_lease_offset = self._read_extra_lease_offset(f) - num_extra_leases = self._read_num_extra_leases(f) - if lease_number < 4: - offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE - elif (lease_number-4) < num_extra_leases: - offset = (extra_lease_offset - + 4 - + (lease_number-4)*self.LEASE_SIZE) - else: - # must add an extra lease record - self._write_num_extra_leases(f, num_extra_leases+1) - offset = (extra_lease_offset - + 4 - + (lease_number-4)*self.LEASE_SIZE) - f.seek(offset) - assert f.tell() == offset - f.write(lease_info.to_mutable_data()) - - def _read_lease_record(self, f, lease_number): - # returns a LeaseInfo instance, or None - extra_lease_offset = self._read_extra_lease_offset(f) - num_extra_leases = self._read_num_extra_leases(f) - if lease_number < 4: - offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE - elif (lease_number-4) < num_extra_leases: - offset = (extra_lease_offset - + 4 - + (lease_number-4)*self.LEASE_SIZE) - else: - raise IndexError("No such lease number %d" % lease_number) - f.seek(offset) - assert f.tell() == offset - data = f.read(self.LEASE_SIZE) - lease_info = LeaseInfo().from_mutable_data(data) - if lease_info.owner_num == 0: - return None - return lease_info - - def _get_num_lease_slots(self, f): - # how many places do we have allocated for leases? Not all of them - # are filled. - num_extra_leases = self._read_num_extra_leases(f) - return 4+num_extra_leases - - def _get_first_empty_lease_slot(self, f): - # return an int with the index of an empty slot, or None if we do not - # currently have an empty slot - - for i in range(self._get_num_lease_slots(f)): - if self._read_lease_record(f, i) is None: - return i - return None - - def get_leases(self): - """Yields a LeaseInfo instance for all leases.""" - f = open(self.home, 'rb') - for i, lease in self._enumerate_leases(f): - yield lease - f.close() - - def _enumerate_leases(self, f): - for i in range(self._get_num_lease_slots(f)): - try: - data = self._read_lease_record(f, i) - if data is not None: - yield i,data - except IndexError: - return - - def add_lease(self, lease_info): - precondition(lease_info.owner_num != 0) # 0 means "no lease here" - f = open(self.home, 'rb+') - num_lease_slots = self._get_num_lease_slots(f) - empty_slot = self._get_first_empty_lease_slot(f) - if empty_slot is not None: - self._write_lease_record(f, empty_slot, lease_info) - else: - self._write_lease_record(f, num_lease_slots, lease_info) - f.close() - - def renew_lease(self, renew_secret, new_expire_time): - accepting_nodeids = set() - f = open(self.home, 'rb+') - for (leasenum,lease) in self._enumerate_leases(f): - if timing_safe_compare(lease.renew_secret, renew_secret): - # yup. See if we need to update the owner time. - if new_expire_time > lease.expiration_time: - # yes - lease.expiration_time = new_expire_time - self._write_lease_record(f, leasenum, lease) - f.close() - return - accepting_nodeids.add(lease.nodeid) - f.close() - # Return the accepting_nodeids set, to give the client a chance to - # update the leases on a share which has been migrated from its - # original server to a new one. - msg = ("Unable to renew non-existent lease. I have leases accepted by" - " nodeids: ") - msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid)) - for anid in accepting_nodeids]) - msg += " ." - raise IndexError(msg) - - def add_or_renew_lease(self, lease_info): - precondition(lease_info.owner_num != 0) # 0 means "no lease here" - try: - self.renew_lease(lease_info.renew_secret, - lease_info.expiration_time) - except IndexError: - self.add_lease(lease_info) - - def cancel_lease(self, cancel_secret): - """Remove any leases with the given cancel_secret. If the last lease - is cancelled, the file will be removed. Return the number of bytes - that were freed (by truncating the list of leases, and possibly by - deleting the file. Raise IndexError if there was no lease with the - given cancel_secret.""" - - accepting_nodeids = set() - modified = 0 - remaining = 0 - blank_lease = LeaseInfo(owner_num=0, - renew_secret="\x00"*32, - cancel_secret="\x00"*32, - expiration_time=0, - nodeid="\x00"*20) - f = open(self.home, 'rb+') - for (leasenum,lease) in self._enumerate_leases(f): - accepting_nodeids.add(lease.nodeid) - if timing_safe_compare(lease.cancel_secret, cancel_secret): - self._write_lease_record(f, leasenum, blank_lease) - modified += 1 - else: - remaining += 1 - if modified: - freed_space = self._pack_leases(f) - f.close() - if not remaining: - freed_space += os.stat(self.home)[stat.ST_SIZE] - self.unlink() - return freed_space - - msg = ("Unable to cancel non-existent lease. I have leases " - "accepted by nodeids: ") - msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid)) - for anid in accepting_nodeids]) - msg += " ." - raise IndexError(msg) - - def _pack_leases(self, f): - # TODO: reclaim space from cancelled leases - return 0 - def _read_write_enabler_and_nodeid(self, f): f.seek(0) data = f.read(self.HEADER_SIZE) @@ -378,12 +179,6 @@ class MutableShareFile: f.close() return datav -# def remote_get_length(self): -# f = open(self.home, 'rb') -# data_length = self._read_data_length(f) -# f.close() -# return data_length - def check_write_enabler(self, write_enabler, si_s): f = open(self.home, 'rb+') (real_write_enabler, write_enabler_nodeid) = \ @@ -422,9 +217,7 @@ class MutableShareFile: cur_length = self._read_data_length(f) if new_length < cur_length: self._write_data_length(f, new_length) - # TODO: if we're going to shrink the share file when the - # share data has shrunk, then call - # self._change_container_size() here. + # TODO: shrink the share file. f.close() def testv_compare(a, op, b): diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 9e7ea15b..857ea21a 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -1,22 +1,24 @@ + import os, re, weakref, struct, time -from foolscap.api import Referenceable from twisted.application import service from zope.interface import implements -from allmydata.interfaces import RIStorageServer, IStatsProducer +from allmydata.interfaces import IStatsProducer from allmydata.util import fileutil, idlib, log, time_format import allmydata # for __full_version__ from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported -from allmydata.storage.lease import LeaseInfo from allmydata.storage.mutable import MutableShareFile, EmptyShare, \ create_mutable_sharefile from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader from allmydata.storage.crawler import BucketCountingCrawler -from allmydata.storage.expirer import LeaseCheckingCrawler +from allmydata.storage.accountant import Accountant +from allmydata.storage.expiration import ExpirationPolicy +from allmydata.storage.leasedb import SHARETYPE_MUTABLE + # storage/ # storage/shares/incoming @@ -33,19 +35,16 @@ NUM_RE=re.compile("^[0-9]+$") -class StorageServer(service.MultiService, Referenceable): - implements(RIStorageServer, IStatsProducer) +class StorageServer(service.MultiService): + implements(IStatsProducer) name = 'storage' - LeaseCheckerClass = LeaseCheckingCrawler + BucketCounterClass = BucketCountingCrawler + DEFAULT_EXPIRATION_POLICY = ExpirationPolicy(enabled=False) def __init__(self, storedir, nodeid, reserved_space=0, readonly_storage=False, stats_provider=None, - expiration_enabled=False, - expiration_mode="age", - expiration_override_lease_duration=None, - expiration_cutoff_date=None, - expiration_sharetypes=("mutable", "immutable")): + expiration_policy=None): service.MultiService.__init__(self) assert isinstance(nodeid, str) assert len(nodeid) == 20 @@ -85,16 +84,29 @@ class StorageServer(service.MultiService, Referenceable): "cancel": [], } self.add_bucket_counter() + self.init_accountant(expiration_policy or self.DEFAULT_EXPIRATION_POLICY) + + def init_accountant(self, expiration_policy): + dbfile = os.path.join(self.storedir, "leasedb.sqlite") + statefile = os.path.join(self.storedir, "leasedb_crawler.state") + self.accountant = Accountant(self, dbfile, statefile) + self.accountant.set_expiration_policy(expiration_policy) + self.accountant.setServiceParent(self) + + def get_accountant(self): + return self.accountant + + def get_accounting_crawler(self): + return self.accountant.get_accounting_crawler() + + def get_expiration_policy(self): + return self.accountant.get_accounting_crawler().get_expiration_policy() - statefile = os.path.join(self.storedir, "lease_checker.state") - historyfile = os.path.join(self.storedir, "lease_checker.history") - klass = self.LeaseCheckerClass - self.lease_checker = klass(self, statefile, historyfile, - expiration_enabled, expiration_mode, - expiration_override_lease_duration, - expiration_cutoff_date, - expiration_sharetypes) - self.lease_checker.setServiceParent(self) + def get_bucket_counter(self): + return self.bucket_counter + + def get_nodeid(self): + return self.my_nodeid def __repr__(self): return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) @@ -217,7 +229,9 @@ class StorageServer(service.MultiService, Referenceable): space += bw.allocated_size() return space - def remote_get_version(self): + # these methods can be invoked by our callers + + def client_get_version(self, account): remaining_space = self.get_available_space() if remaining_space is None: # We're on a platform that has no API to get disk stats. @@ -231,18 +245,15 @@ class StorageServer(service.MultiService, Referenceable): "delete-mutable-shares-with-zero-length-writev": True, "fills-holes-with-zero-bytes": True, "prevents-read-past-end-of-share-data": True, + "accounting-v1": {}, }, "application-version": str(allmydata.__full_version__), } return version - def remote_allocate_buckets(self, storage_index, - renew_secret, cancel_secret, + def client_allocate_buckets(self, storage_index, sharenums, allocated_size, - canary, owner_num=0): - # owner_num is not for clients to set, but rather it should be - # curried into the PersonalStorageServer instance that is dedicated - # to a particular owner. + canary, account): start = time.time() self.count("allocate") alreadygot = set() @@ -252,14 +263,8 @@ class StorageServer(service.MultiService, Referenceable): log.msg("storage: allocate_buckets %s" % si_s) - # in this implementation, the lease information (including secrets) - # goes into the share files themselves. It could also be put into a - # separate database. Note that the lease should not be added until - # the BucketWriter has been closed. - expire_time = time.time() + 31*24*60*60 - lease_info = LeaseInfo(owner_num, - renew_secret, cancel_secret, - expire_time, self.my_nodeid) + # Note that the lease should not be added until the BucketWriter has + # been closed. This is handled in BucketWriter.close() max_space_per_bucket = allocated_size @@ -278,8 +283,6 @@ class StorageServer(service.MultiService, Referenceable): # file, they'll want us to hold leases for this file. for (shnum, fn) in self._get_bucket_shares(storage_index): alreadygot.add(shnum) - sf = ShareFile(fn) - sf.add_or_renew_lease(lease_info) for shnum in sharenums: incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum) @@ -295,8 +298,9 @@ class StorageServer(service.MultiService, Referenceable): pass elif (not limited) or (remaining_space >= max_space_per_bucket): # ok! we need to create the new share file. - bw = BucketWriter(self, incominghome, finalhome, - max_space_per_bucket, lease_info, canary) + bw = BucketWriter(self, account, storage_index, shnum, + incominghome, finalhome, + max_space_per_bucket, canary) bucketwriters[shnum] = bw self._active_writers[bw] = 1 if limited: @@ -327,31 +331,6 @@ class StorageServer(service.MultiService, Referenceable): continue # non-sharefile yield sf - def remote_add_lease(self, storage_index, renew_secret, cancel_secret, - owner_num=1): - start = time.time() - self.count("add-lease") - new_expire_time = time.time() + 31*24*60*60 - lease_info = LeaseInfo(owner_num, - renew_secret, cancel_secret, - new_expire_time, self.my_nodeid) - for sf in self._iter_share_files(storage_index): - sf.add_or_renew_lease(lease_info) - self.add_latency("add-lease", time.time() - start) - return None - - def remote_renew_lease(self, storage_index, renew_secret): - start = time.time() - self.count("renew") - new_expire_time = time.time() + 31*24*60*60 - found_buckets = False - for sf in self._iter_share_files(storage_index): - found_buckets = True - sf.renew_lease(renew_secret, new_expire_time) - self.add_latency("renew", time.time() - start) - if not found_buckets: - raise IndexError("no such lease to renew") - def bucket_writer_closed(self, bw, consumed_size): if self.stats_provider: self.stats_provider.count('storage_server.bytes_added', consumed_size) @@ -371,7 +350,7 @@ class StorageServer(service.MultiService, Referenceable): # Commonly caused by there being no buckets at all. pass - def remote_get_buckets(self, storage_index): + def client_get_buckets(self, storage_index): start = time.time() self.count("get") si_s = si_b2a(storage_index) @@ -383,32 +362,17 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("get", time.time() - start) return bucketreaders - def get_leases(self, storage_index): - """Provide an iterator that yields all of the leases attached to this - bucket. Each lease is returned as a LeaseInfo instance. - - This method is not for client use. - """ - - # since all shares get the same lease data, we just grab the leases - # from the first share - try: - shnum, filename = self._get_bucket_shares(storage_index).next() - sf = ShareFile(filename) - return sf.get_leases() - except StopIteration: - return iter([]) - - def remote_slot_testv_and_readv_and_writev(self, storage_index, - secrets, + def client_slot_testv_and_readv_and_writev(self, storage_index, + write_enabler, test_and_write_vectors, - read_vector): + read_vector, account): start = time.time() self.count("writev") si_s = si_b2a(storage_index) + log.msg("storage: slot_writev %s" % si_s) si_dir = storage_index_to_dir(storage_index) - (write_enabler, renew_secret, cancel_secret) = secrets + # shares exist if there is a file for them bucketdir = os.path.join(self.sharedir, si_dir) shares = {} @@ -447,12 +411,6 @@ class StorageServer(service.MultiService, Referenceable): for sharenum, share in shares.items(): read_data[sharenum] = share.readv(read_vector) - ownerid = 1 # TODO - expire_time = time.time() + 31*24*60*60 # one month - lease_info = LeaseInfo(ownerid, - renew_secret, cancel_secret, - expire_time, self.my_nodeid) - if testv_is_good: # now apply the write vectors for sharenum in test_and_write_vectors: @@ -460,32 +418,37 @@ class StorageServer(service.MultiService, Referenceable): if new_length == 0: if sharenum in shares: shares[sharenum].unlink() + account.remove_share_and_leases(storage_index, sharenum) else: if sharenum not in shares: # allocate a new share - allocated_size = 2000 # arbitrary, really - share = self._allocate_slot_share(bucketdir, secrets, + allocated_size = 2000 # arbitrary, really # REMOVE + share = self._allocate_slot_share(bucketdir, + write_enabler, sharenum, - allocated_size, - owner_num=0) + allocated_size) shares[sharenum] = share - shares[sharenum].writev(datav, new_length) - # and update the lease - shares[sharenum].add_or_renew_lease(lease_info) + shares[sharenum].writev(datav, new_length) + account.add_share(storage_index, sharenum, + shares[sharenum].get_used_space(), SHARETYPE_MUTABLE) + else: + # apply the write vector and update the lease + shares[sharenum].writev(datav, new_length) + + account.add_or_renew_default_lease(storage_index, sharenum) + account.mark_share_as_stable(storage_index, sharenum, + shares[sharenum].get_used_space()) if new_length == 0: # delete empty bucket directories if not os.listdir(bucketdir): os.rmdir(bucketdir) - # all done self.add_latency("writev", time.time() - start) return (testv_is_good, read_data) - def _allocate_slot_share(self, bucketdir, secrets, sharenum, - allocated_size, owner_num=0): - (write_enabler, renew_secret, cancel_secret) = secrets + def _allocate_slot_share(self, bucketdir, write_enabler, sharenum, allocated_size): my_nodeid = self.my_nodeid fileutil.make_dirs(bucketdir) filename = os.path.join(bucketdir, "%d" % sharenum) @@ -493,7 +456,12 @@ class StorageServer(service.MultiService, Referenceable): self) return share - def remote_slot_readv(self, storage_index, shares, readv): + def delete_share(self, storage_index, shnum): + si_dir = storage_index_to_dir(storage_index) + filename = os.path.join(self.sharedir, si_dir, "%d" % (shnum,)) + os.unlink(filename) + + def client_slot_readv(self, storage_index, shares, readv, account): start = time.time() self.count("readv") si_s = si_b2a(storage_index) @@ -520,8 +488,7 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("readv", time.time() - start) return datavs - def remote_advise_corrupt_share(self, share_type, storage_index, shnum, - reason): + def client_advise_corrupt_share(self, share_type, storage_index, shnum, reason): fileutil.make_dirs(self.corruption_advisory_dir) now = time_format.iso_utc(sep="T") si_s = si_b2a(storage_index) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index e532db1a..79326f9a 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -75,7 +75,7 @@ class StorageFarmBroker: # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): - s = NativeStorageServer(serverid, ann.copy()) + s = NativeStorageServer(None, ann.copy()) s.rref = rref s._is_connected = True self.servers[serverid] = s diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 8dd9a2f9..35398fa1 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -282,7 +282,8 @@ class NoNetworkGrid(service.MultiService): ss.setServiceParent(middleman) serverid = ss.my_nodeid self.servers_by_number[i] = ss - wrapper = wrap_storage_server(ss) + aa = ss.get_accountant().get_anonymous_account() + wrapper = wrap_storage_server(aa) self.wrappers_by_id[serverid] = wrapper self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper) self.rebuild_serverlist() diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index d030c12a..0e345ae0 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -41,7 +41,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin): "my-version": "ver", "oldest-supported": "oldest", } - s = NativeStorageServer(key_s, ann) + s = NativeStorageServer(key_s, ann, None) sb.test_add_server(peerid, s) # XXX: maybe use key_s? c = FakeClient() c.storage_broker = sb @@ -451,8 +451,9 @@ class AddLease(GridTestMixin, unittest.TestCase): def broken_add_lease(*args, **kwargs): really_did_break.append(1) raise KeyError("intentional failure, should be ignored") - assert self.g.servers_by_number[0].remote_add_lease - self.g.servers_by_number[0].remote_add_lease = broken_add_lease + ss = self.g.servers_by_number[0].get_accountant().get_anonymous_account() + assert ss.remote_add_lease + ss.remote_add_lease = broken_add_lease d.addCallback(_break_add_lease) # and confirm that the files still look healthy diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index f28ba017..a17c68a1 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -273,12 +273,12 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): self.failUnlessRaises(OldConfigOptionError, client.Client, basedir) def _permute(self, sb, key): - return [ s.get_longname() for s in sb.get_servers_for_psi(key) ] + return [ base32.a2b(s.get_longname()) for s in sb.get_servers_for_psi(key) ] def test_permute(self): sb = StorageFarmBroker(None, True) for k in ["%d" % i for i in range(5)]: - ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake", + ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(k), "permutation-seed-base32": base32.b2a(k) } sb.test_add_rref(k, "rref", ann) @@ -295,8 +295,9 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): "[storage]\n" + \ "enabled = true\n") c = client.Client(basedir) - ss = c.getServiceNamed("storage") - verdict = ss.remote_get_version() + server = c.getServiceNamed("storage") + aa = server.get_accountant().get_anonymous_account() + verdict = aa.remote_get_version() self.failUnlessReallyEqual(verdict["application-version"], str(allmydata.__full_version__)) self.failIfEqual(str(allmydata.__version__), "unknown") diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py index 7f48a2c0..5b8e2ab9 100644 --- a/src/allmydata/test/test_crawler.py +++ b/src/allmydata/test/test_crawler.py @@ -76,10 +76,10 @@ class Basic(unittest.TestCase, StallMixin, CrawlerTestMixin): server.setServiceParent(self.s) return server - def write(self, i, ss, serverid, tail=0): + def write(self, i, aa, serverid, tail=0): si = self.si(i) si = si[:-1] + chr(tail) - had,made = ss.remote_allocate_buckets(si, + had,made = aa.remote_allocate_buckets(si, self.rs(i, serverid), self.cs(i, serverid), set([0]), 99, FakeCanary()) @@ -88,12 +88,13 @@ class Basic(unittest.TestCase, StallMixin, CrawlerTestMixin): return si_b2a(si) def test_service(self): - ss = self.create("crawler/Basic/service") + server = self.create("crawler/Basic/service") + aa = server.get_accountant().get_anonymous_account() - sis = [self.write(i, ss, self.serverid) for i in range(10)] + sis = [self.write(i, aa, self.serverid) for i in range(10)] statefile = os.path.join(self.basedir, "statefile") - c = EnumeratingCrawler(ss, statefile) + c = EnumeratingCrawler(server, statefile) c.setServiceParent(self.s) # it should be legal to call get_state() and get_progress() right @@ -126,7 +127,7 @@ class Basic(unittest.TestCase, StallMixin, CrawlerTestMixin): # Check that a new crawler picks up on the state file correctly. def _new_crawler(ign): - c2 = EnumeratingCrawler(ss, statefile) + c2 = EnumeratingCrawler(server, statefile) c2.setServiceParent(self.s) d2 = c2.set_hook('after_cycle') @@ -145,13 +146,14 @@ class Basic(unittest.TestCase, StallMixin, CrawlerTestMixin): # Crawler is accomplishing it's run-slowly goals, re-enable this test # and read the stdout when it runs. - ss = self.create("crawler/Basic/cpu_usage") + server = self.create("crawler/Basic/cpu_usage") + aa = server.get_accountant().get_anonymous_account() for i in range(10): - self.write(i, ss, self.serverid) + self.write(i, aa, self.serverid) statefile = os.path.join(self.basedir, "statefile") - c = ConsumingCrawler(ss, statefile) + c = ConsumingCrawler(server, statefile) c.setServiceParent(self.s) # This will run as fast as it can, consuming about 50ms per call to @@ -187,13 +189,14 @@ class Basic(unittest.TestCase, StallMixin, CrawlerTestMixin): return d def test_empty_subclass(self): - ss = self.create("crawler/Basic/empty_subclass") + server = self.create("crawler/Basic/empty_subclass") + aa = server.get_accountant().get_anonymous_account() for i in range(10): - self.write(i, ss, self.serverid) + self.write(i, aa, self.serverid) statefile = os.path.join(self.basedir, "statefile") - c = ShareCrawler(ss, statefile) + c = ShareCrawler(server, statefile) c.slow_start = 0 c.setServiceParent(self.s) @@ -205,13 +208,14 @@ class Basic(unittest.TestCase, StallMixin, CrawlerTestMixin): return d def test_oneshot(self): - ss = self.create("crawler/Basic/oneshot") + server = self.create("crawler/Basic/oneshot") + aa = server.get_accountant().get_anonymous_account() for i in range(30): - self.write(i, ss, self.serverid) + self.write(i, aa, self.serverid) statefile = os.path.join(self.basedir, "statefile") - c = EnumeratingCrawler(ss, statefile) + c = EnumeratingCrawler(server, statefile) c.setServiceParent(self.s) d = c.set_hook('after_cycle') diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py index e2d6f6a1..df7ee664 100644 --- a/src/allmydata/test/test_hung_server.py +++ b/src/allmydata/test/test_hung_server.py @@ -70,7 +70,8 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, def _copy_share(self, share, to_server): (sharenum, sharefile) = share (id, ss) = to_server - shares_dir = os.path.join(ss.original.storedir, "shares") + original_server = ss.original.server + shares_dir = os.path.join(original_server.storedir, "shares") si = uri.from_string(self.uri).get_storage_index() si_dir = os.path.join(shares_dir, storage_index_to_dir(si)) if not os.path.exists(si_dir): @@ -79,8 +80,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, shutil.copy(sharefile, new_sharefile) self.shares = self.find_uri_shares(self.uri) # Make sure that the storage server has the share. - self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile) - in self.shares) + self.failUnlessIn((sharenum, original_server.get_nodeid(), new_sharefile), self.shares) def _corrupt_share(self, share, corruptor_func): (sharenum, sharefile) = share diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index cd355cf7..3cf7e403 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -11,7 +11,7 @@ from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \ ssk_pubkey_fingerprint_hash from allmydata.util.consumer import MemoryConsumer -from allmydata.util.deferredutil import gatherResults +from allmydata.util.deferredutil import gatherResults, WaitForDelayedCallsMixin from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \ NotEnoughSharesError, SDMF_VERSION, MDMF_VERSION, DownloadStopped from allmydata.monitor import Monitor @@ -232,14 +232,14 @@ def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0): def make_storagebroker(s=None, num_peers=10): if not s: s = FakeStorage() - peerids = [tagged_hash("peerid", "%d" % i)[:20] - for i in range(num_peers)] + serverids = [tagged_hash("peerid", "%d" % i)[:20] + for i in range(num_peers)] storage_broker = StorageFarmBroker(None, True) - for peerid in peerids: - fss = FakeStorageServer(peerid, s) - ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid), - "permutation-seed-base32": base32.b2a(peerid) } - storage_broker.test_add_rref(peerid, fss, ann) + for serverid in serverids: + fss = FakeStorageServer(serverid, s) + ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid), + "permutation-seed-base32": base32.b2a(serverid) } + storage_broker.test_add_rref(serverid, fss, ann) return storage_broker def make_nodemaker(s=None, num_peers=10, keysize=TEST_RSA_KEY_SIZE): @@ -253,7 +253,7 @@ def make_nodemaker(s=None, num_peers=10, keysize=TEST_RSA_KEY_SIZE): {"k": 3, "n": 10}, SDMF_VERSION, keygen) return nodemaker -class Filenode(unittest.TestCase, testutil.ShouldFailMixin): +class Filenode(unittest.TestCase, testutil.ShouldFailMixin, WaitForDelayedCallsMixin): # this used to be in Publish, but we removed the limit. Some of # these tests test whether the new code correctly allows files # larger than the limit. @@ -845,6 +845,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin): return d d.addCallback(_created) + d.addBoth(self.wait_for_delayed_calls) return d def test_upload_and_download_full_size_keys(self): @@ -897,6 +898,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin): d.addCallback(_created) d.addCallback(lambda ignored: self.failUnlessEqual(self.n.get_size(), 9)) + d.addBoth(self.wait_for_delayed_calls) return d diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py index ca7e0df5..25b1eca7 100644 --- a/src/allmydata/test/test_repairer.py +++ b/src/allmydata/test/test_repairer.py @@ -699,29 +699,37 @@ class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin, return d def test_servers_responding(self): + # This test exercises a bug (ticket #1739) in which the servers-responding list + # did not include servers that responded to the Repair, but not the pre-repair + # filecheck. self.basedir = "repairer/Repairer/servers_responding" self.set_up_grid(num_clients=2) d = self.upload_and_stash() - # now cause one of the servers to not respond during the pre-repair - # filecheck, but then *do* respond to the post-repair filecheck + def _then(ign): + # Cause one of the servers to not respond during the pre-repair + # filecheck, but then *do* respond to the post-repair filecheck. ss = self.g.servers_by_number[0] - self.g.break_server(ss.my_nodeid, count=1) + self.g.break_server(ss.get_nodeid(), count=1) + + shares = self.find_uri_shares(self.uri) + self.failUnlessEqual(len(shares), 10) self.delete_shares_numbered(self.uri, [9]) return self.c0_filenode.check_and_repair(Monitor()) d.addCallback(_then) def _check(rr): - # this exercises a bug in which the servers-responding list did - # not include servers that responded to the Repair, but which did - # not respond to the pre-repair filecheck prr = rr.get_post_repair_results() + + # We expect the repair to have restored all shares... + self.failUnlessEqual(prr.get_share_counter_good(), 10) + + # ... and all the servers should be in servers-responding. expected = set(self.g.get_all_serverids()) - responding_set = frozenset([s.get_serverid() for s in prr.get_servers_responding()]) - self.failIf(expected - responding_set, expected - responding_set) - self.failIf(responding_set - expected, responding_set - expected) - self.failUnlessEqual(expected, - set([s.get_serverid() - for s in prr.get_servers_responding()])) + responding = set([s.get_serverid() for s in prr.get_servers_responding()]) + self.failUnlessEqual(expected, responding, + ("\nexpected - responding = %r" + "\nresponding - expected = %r") + % (expected - responding, responding - expected)) d.addCallback(_check) return d diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 28a12e48..d993d95e 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -4,19 +4,20 @@ import mock from twisted.trial import unittest -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.application import service from foolscap.api import fireEventually import itertools + from allmydata import interfaces -from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format +from allmydata.util import fileutil, hashutil, base32, time_format from allmydata.storage.server import StorageServer from allmydata.storage.mutable import MutableShareFile -from allmydata.storage.immutable import BucketWriter, BucketReader +from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \ UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError -from allmydata.storage.lease import LeaseInfo -from allmydata.storage.expirer import LeaseCheckingCrawler +from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE, SHARETYPE_MUTABLE +from allmydata.storage.expiration import ExpirationPolicy from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ ReadBucketProxy from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \ @@ -36,6 +37,15 @@ from allmydata.web.storage import StorageStatus, remove_prefix class Marker: pass + +class FakeAccount: + def add_share(self, storage_index, shnum, used_space, sharetype, commit=True): + pass + def add_or_renew_default_lease(self, storage_index, shnum, commit=True): + pass + def mark_share_as_stable(self, storage_index, shnum, used_space, commit=True): + pass + class FakeCanary: def __init__(self, ignore_disconnectors=False): self.ignore = ignore_disconnectors @@ -57,15 +67,8 @@ class FakeStatsProvider: def register_producer(self, producer): pass -class Bucket(unittest.TestCase): - def make_workdir(self, name): - basedir = os.path.join("storage", "Bucket", name) - incoming = os.path.join(basedir, "tmp", "bucket") - final = os.path.join(basedir, "bucket") - fileutil.make_dirs(basedir) - fileutil.make_dirs(os.path.join(basedir, "tmp")) - return incoming, final +class BucketTestMixin: def bucket_writer_closed(self, bw, consumed): pass def add_latency(self, category, latency): @@ -73,19 +76,20 @@ class Bucket(unittest.TestCase): def count(self, name, delta=1): pass - def make_lease(self): - owner_num = 0 - renew_secret = os.urandom(32) - cancel_secret = os.urandom(32) - expiration_time = time.time() + 5000 - return LeaseInfo(owner_num, renew_secret, cancel_secret, - expiration_time, "\x00" * 20) + +class Bucket(BucketTestMixin, unittest.TestCase): + def make_workdir(self, name): + basedir = os.path.join("storage", "Bucket", name) + incoming = os.path.join(basedir, "tmp", "bucket") + final = os.path.join(basedir, "bucket") + fileutil.make_dirs(basedir) + fileutil.make_dirs(os.path.join(basedir, "tmp")) + return incoming, final def test_create(self): incoming, final = self.make_workdir("test_create") - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) - bw.remote_write(0, "a"*25) + bw = BucketWriter(self, FakeAccount(), "si1", 0, incoming, final, 200, FakeCanary()) + bw.remote_write(0, "a"*25) bw.remote_write(25, "b"*25) bw.remote_write(50, "c"*25) bw.remote_write(75, "d"*7) @@ -93,28 +97,22 @@ class Bucket(unittest.TestCase): def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) - bw.remote_write(0, "a"*25) + bw = BucketWriter(self, FakeAccount(), "si1", 0, incoming, final, 200, FakeCanary()) + bw.remote_write(0, "a"*25) bw.remote_write(25, "b"*25) bw.remote_write(50, "c"*7) # last block may be short bw.remote_close() # now read from it br = BucketReader(self, bw.finalhome) - self.failUnlessEqual(br.remote_read(0, 25), "a"*25) + self.failUnlessEqual(br.remote_read(0, 25), "a"*25) self.failUnlessEqual(br.remote_read(25, 25), "b"*25) - self.failUnlessEqual(br.remote_read(50, 7), "c"*7) + self.failUnlessEqual(br.remote_read(50, 7 ), "c"*7 ) def test_read_past_end_of_share_data(self): # test vector for immutable files (hard-coded contents of an immutable share # file): - # The following immutable share file content is identical to that - # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2 - # with share data == 'a'. The total size of this content is 85 - # bytes. - containerdata = struct.pack('>LLL', 1, 1, 1) # A Tahoe-LAFS storage client would send as the share_data a @@ -122,17 +120,8 @@ class Bucket(unittest.TestCase): # -- see allmydata/immutable/layout.py . This test, which is # simulating a client, just sends 'a'. share_data = 'a' - - ownernumber = struct.pack('>L', 0) - renewsecret = 'THIS LETS ME RENEW YOUR FILE....' - assert len(renewsecret) == 32 - cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA' - assert len(cancelsecret) == 32 - expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds - - lease_data = ownernumber + renewsecret + cancelsecret + expirationtime - - share_file_data = containerdata + share_data + lease_data + extra_data = 'b' * ShareFile.LEASE_SIZE + share_file_data = containerdata + share_data + extra_data incoming, final = self.make_workdir("test_read_past_end_of_share_data") @@ -145,12 +134,7 @@ class Bucket(unittest.TestCase): self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data) - # Read past the end of share data to get the cancel secret. - read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret) - - result_of_read = br.remote_read(0, read_length) - self.failUnlessEqual(result_of_read, share_data) - + # Read past the end of share data by 1 byte. result_of_read = br.remote_read(0, len(share_data)+1) self.failUnlessEqual(result_of_read, share_data) @@ -173,34 +157,19 @@ class RemoteBucket: return defer.maybeDeferred(_call) -class BucketProxy(unittest.TestCase): +class BucketProxy(BucketTestMixin, unittest.TestCase): def make_bucket(self, name, size): basedir = os.path.join("storage", "BucketProxy", name) incoming = os.path.join(basedir, "tmp", "bucket") final = os.path.join(basedir, "bucket") fileutil.make_dirs(basedir) fileutil.make_dirs(os.path.join(basedir, "tmp")) - bw = BucketWriter(self, incoming, final, size, self.make_lease(), - FakeCanary()) + si = "si1" + bw = BucketWriter(self, FakeAccount(), si, 0, incoming, final, size, FakeCanary()) rb = RemoteBucket() rb.target = bw return bw, rb, final - def make_lease(self): - owner_num = 0 - renew_secret = os.urandom(32) - cancel_secret = os.urandom(32) - expiration_time = time.time() + 5000 - return LeaseInfo(owner_num, renew_secret, cancel_secret, - expiration_time, "\x00" * 20) - - def bucket_writer_closed(self, bw, consumed): - pass - def add_latency(self, category, latency): - pass - def count(self, name, delta=1): - pass - def test_create(self): bw, rb, sharefname = self.make_bucket("test_create", 500) bp = WriteBucketProxy(rb, None, @@ -322,14 +291,18 @@ class Server(unittest.TestCase): self.create("test_create") def test_declares_fixed_1528(self): - ss = self.create("test_declares_fixed_1528") - ver = ss.remote_get_version() + server = self.create("test_declares_fixed_1528") + aa = server.get_accountant().get_anonymous_account() + + ver = aa.remote_get_version() sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1) def test_declares_maximum_share_sizes(self): - ss = self.create("test_declares_maximum_share_sizes") - ver = ss.remote_get_version() + server = self.create("test_declares_maximum_share_sizes") + aa = server.get_accountant().get_anonymous_account() + + ver = aa.remote_get_version() sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnlessIn('maximum-immutable-share-size', sv1) self.failUnlessIn('maximum-mutable-share-size', sv1) @@ -340,12 +313,12 @@ class Server(unittest.TestCase): sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnlessIn('available-space', sv1) - def allocate(self, ss, storage_index, sharenums, size, canary=None): + def allocate(self, aa, storage_index, sharenums, size, canary=None): renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()) cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()) if not canary: canary = FakeCanary() - return ss.remote_allocate_buckets(storage_index, + return aa.remote_allocate_buckets(storage_index, renew_secret, cancel_secret, sharenums, size, canary) @@ -358,9 +331,10 @@ class Server(unittest.TestCase): if avail <= 4*2**30: raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.") - ss = self.create("test_large_share") + server = self.create("test_large_share") + aa = server.get_accountant().get_anonymous_account() - already,writers = self.allocate(ss, "allocate", [0], 2**32+2) + already,writers = self.allocate(aa, "allocate", [0], 2**32+2) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0])) @@ -369,7 +343,7 @@ class Server(unittest.TestCase): bucket.remote_write(2**32, "ab") bucket.remote_close() - readers = ss.remote_get_buckets("allocate") + readers = aa.remote_get_buckets("allocate") reader = readers[shnum] self.failUnlessEqual(reader.remote_read(2**32, 2), "ab") @@ -379,8 +353,10 @@ class Server(unittest.TestCase): share lots of leading bits with an extant share (but isn't the exact same storage index), this won't add an entry to the share directory. """ - ss = self.create("test_dont_overfill_dirs") - already, writers = self.allocate(ss, "storageindex", [0], 10) + server = self.create("test_dont_overfill_dirs") + aa = server.get_accountant().get_anonymous_account() + + already, writers = self.allocate(aa, "storageindex", [0], 10) for i, wb in writers.items(): wb.remote_write(0, "%10d" % i) wb.remote_close() @@ -390,7 +366,7 @@ class Server(unittest.TestCase): # Now store another one under another storageindex that has leading # chars the same as the first storageindex. - already, writers = self.allocate(ss, "storageindey", [0], 10) + already, writers = self.allocate(aa, "storageindey", [0], 10) for i, wb in writers.items(): wb.remote_write(0, "%10d" % i) wb.remote_close() @@ -400,8 +376,10 @@ class Server(unittest.TestCase): self.failUnlessEqual(children_of_storedir, new_children_of_storedir) def test_remove_incoming(self): - ss = self.create("test_remove_incoming") - already, writers = self.allocate(ss, "vid", range(3), 10) + server = self.create("test_remove_incoming") + aa = server.get_accountant().get_anonymous_account() + + already, writers = self.allocate(aa, "vid", range(3), 10) for i,wb in writers.items(): wb.remote_write(0, "%10d" % i) wb.remote_close() @@ -417,27 +395,29 @@ class Server(unittest.TestCase): # remote_abort, when called on a writer, should make sure that # the allocated size of the bucket is not counted by the storage # server when accounting for space. - ss = self.create("test_abort") - already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150) - self.failIfEqual(ss.allocated_size(), 0) + server = self.create("test_abort") + aa = server.get_accountant().get_anonymous_account() + + already, writers = self.allocate(aa, "allocate", [0, 1, 2], 150) + self.failIfEqual(server.allocated_size(), 0) # Now abort the writers. for writer in writers.itervalues(): writer.remote_abort() - self.failUnlessEqual(ss.allocated_size(), 0) - + self.failUnlessEqual(server.allocated_size(), 0) def test_allocate(self): - ss = self.create("test_allocate") + server = self.create("test_allocate") + aa = server.get_accountant().get_anonymous_account() - self.failUnlessEqual(ss.remote_get_buckets("allocate"), {}) + self.failUnlessEqual(aa.remote_get_buckets("allocate"), {}) - already,writers = self.allocate(ss, "allocate", [0,1,2], 75) + already,writers = self.allocate(aa, "allocate", [0,1,2], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) # while the buckets are open, they should not count as readable - self.failUnlessEqual(ss.remote_get_buckets("allocate"), {}) + self.failUnlessEqual(aa.remote_get_buckets("allocate"), {}) # close the buckets for i,wb in writers.items(): @@ -447,7 +427,7 @@ class Server(unittest.TestCase): wb.remote_abort() # now they should be readable - b = ss.remote_get_buckets("allocate") + b = aa.remote_get_buckets("allocate") self.failUnlessEqual(set(b.keys()), set([0,1,2])) self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0) b_str = str(b[0]) @@ -457,21 +437,21 @@ class Server(unittest.TestCase): # now if we ask about writing again, the server should offer those # three buckets as already present. It should offer them even if we # don't ask about those specific ones. - already,writers = self.allocate(ss, "allocate", [2,3,4], 75) + already,writers = self.allocate(aa, "allocate", [2,3,4], 75) self.failUnlessEqual(already, set([0,1,2])) self.failUnlessEqual(set(writers.keys()), set([3,4])) # while those two buckets are open for writing, the server should # refuse to offer them to uploaders - already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75) + already2,writers2 = self.allocate(aa, "allocate", [2,3,4,5], 75) self.failUnlessEqual(already2, set([0,1,2])) self.failUnlessEqual(set(writers2.keys()), set([5])) # aborting the writes should remove the tempfiles for i,wb in writers2.items(): wb.remote_abort() - already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75) + already2,writers2 = self.allocate(aa, "allocate", [2,3,4,5], 75) self.failUnlessEqual(already2, set([0,1,2])) self.failUnlessEqual(set(writers2.keys()), set([5])) @@ -481,28 +461,32 @@ class Server(unittest.TestCase): wb.remote_abort() def test_bad_container_version(self): - ss = self.create("test_bad_container_version") - a,w = self.allocate(ss, "si1", [0], 10) + server = self.create("test_bad_container_version") + aa = server.get_accountant().get_anonymous_account() + + a,w = self.allocate(aa, "si1", [0], 10) w[0].remote_write(0, "\xff"*10) w[0].remote_close() - fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0") + fn = os.path.join(server.sharedir, storage_index_to_dir("si1"), "0") f = open(fn, "rb+") f.seek(0) f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1 f.close() - ss.remote_get_buckets("allocate") + aa.remote_get_buckets("allocate") e = self.failUnlessRaises(UnknownImmutableContainerVersionError, - ss.remote_get_buckets, "si1") + aa.remote_get_buckets, "si1") self.failUnlessIn(" had version 0 but we wanted 1", str(e)) def test_disconnect(self): # simulate a disconnection - ss = self.create("test_disconnect") + server = self.create("test_disconnect") + aa = server.get_accountant().get_anonymous_account() + canary = FakeCanary() - already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary) + already,writers = self.allocate(aa, "disconnect", [0,1,2], 75, canary) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) for (f,args,kwargs) in canary.disconnectors.values(): @@ -511,7 +495,7 @@ class Server(unittest.TestCase): del writers # that ought to delete the incoming shares - already,writers = self.allocate(ss, "disconnect", [0,1,2], 75) + already,writers = self.allocate(aa, "disconnect", [0,1,2], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) @@ -523,7 +507,9 @@ class Server(unittest.TestCase): 'avail': max(15000 - reserved_space, 0), } - ss = self.create("test_reserved_space", reserved_space=reserved_space) + server = self.create("test_reserved_space", reserved_space=reserved_space) + aa = server.get_accountant().get_anonymous_account() + # 15k available, 10k reserved, leaves 5k for shares # a newly created and filled share incurs this much overhead, beyond @@ -531,22 +517,22 @@ class Server(unittest.TestCase): OVERHEAD = 3*4 LEASE_SIZE = 4+32+32+4 canary = FakeCanary(True) - already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary) + already,writers = self.allocate(aa, "vid1", [0,1,2], 1000, canary) self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally # allocated, allowing only 2000 more to be claimed - self.failUnlessEqual(len(ss._active_writers), 3) + self.failUnlessEqual(len(server._active_writers), 3) # allocating 1001-byte shares only leaves room for one - already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary) + already2,writers2 = self.allocate(aa, "vid2", [0,1,2], 1001, canary) self.failUnlessEqual(len(writers2), 1) - self.failUnlessEqual(len(ss._active_writers), 4) + self.failUnlessEqual(len(server._active_writers), 4) # we abandon the first set, so their provisional allocation should be # returned del already del writers - self.failUnlessEqual(len(ss._active_writers), 1) + self.failUnlessEqual(len(server._active_writers), 1) # now we have a provisional allocation of 1001 bytes # and we close the second set, so their provisional allocation should @@ -558,7 +544,7 @@ class Server(unittest.TestCase): del already2 del writers2 del bw - self.failUnlessEqual(len(ss._active_writers), 0) + self.failUnlessEqual(len(server._active_writers), 0) allocated = 1001 + OVERHEAD + LEASE_SIZE @@ -571,23 +557,22 @@ class Server(unittest.TestCase): # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and # 5000-1085=3915 free, therefore we can fit 39 100byte shares - already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary) + already3,writers3 = self.allocate(aa, "vid3", range(100), 100, canary) self.failUnlessEqual(len(writers3), 39) - self.failUnlessEqual(len(ss._active_writers), 39) + self.failUnlessEqual(len(server._active_writers), 39) del already3 del writers3 - self.failUnlessEqual(len(ss._active_writers), 0) - ss.disownServiceParent() - del ss + self.failUnlessEqual(len(server._active_writers), 0) + server.disownServiceParent() + del server def test_seek(self): basedir = self.workdir("test_seek_behavior") fileutil.make_dirs(basedir) filename = os.path.join(basedir, "testfile") - f = open(filename, "wb") - f.write("start") - f.close() + fileutil.write(filename, "start") + # mode="w" allows seeking-to-create-holes, but truncates pre-existing # files. mode="a" preserves previous contents but does not allow # seeking-to-create-holes. mode="r+" allows both. @@ -600,111 +585,83 @@ class Server(unittest.TestCase): f2 = open(filename, "rb") self.failUnlessEqual(f2.read(5), "start") + def compare_leases(self, leases_a, leases_b, with_timestamps=True): + self.failUnlessEqual(len(leases_a), len(leases_b)) + for i in range(len(leases_a)): + a = leases_a[i] + b = leases_b[i] + self.failUnlessEqual(a.owner_num, b.owner_num) + if with_timestamps: + self.failUnlessEqual(a.renewal_time, b.renewal_time) + self.failUnlessEqual(a.expiration_time, b.expiration_time) def test_leases(self): - ss = self.create("test_leases") + server = self.create("test_leases") + aa = server.get_accountant().get_anonymous_account() + sa = server.get_accountant().get_starter_account() + canary = FakeCanary() sharenums = range(5) size = 100 - rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()), - hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())) - already,writers = ss.remote_allocate_buckets("si0", rs0, cs0, + # create a random non-numeric file in the bucket directory, to + # exercise the code that's supposed to ignore those. + bucket_dir = os.path.join(self.workdir("test_leases"), + "shares", storage_index_to_dir("six")) + os.makedirs(bucket_dir) + fileutil.write(os.path.join(bucket_dir, "ignore_me.txt"), + "you ought to be ignoring me\n") + + already,writers = aa.remote_allocate_buckets("si1", "", "", sharenums, size, canary) self.failUnlessEqual(len(already), 0) self.failUnlessEqual(len(writers), 5) for wb in writers.values(): wb.remote_close() - leases = list(ss.get_leases("si0")) - self.failUnlessEqual(len(leases), 1) - self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0])) + leases = aa.get_leases("si1") + self.failUnlessEqual(len(leases), 5) - rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()), - hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())) - already,writers = ss.remote_allocate_buckets("si1", rs1, cs1, - sharenums, size, canary) - for wb in writers.values(): - wb.remote_close() + aa.add_share("six", 0, 0, SHARETYPE_IMMUTABLE) + # adding a share does not immediately add a lease + self.failUnlessEqual(len(aa.get_leases("six")), 0) - # take out a second lease on si1 - rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()), - hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())) - already,writers = ss.remote_allocate_buckets("si1", rs2, cs2, - sharenums, size, canary) - self.failUnlessEqual(len(already), 5) - self.failUnlessEqual(len(writers), 0) + aa.add_or_renew_default_lease("six", 0) + self.failUnlessEqual(len(aa.get_leases("six")), 1) - leases = list(ss.get_leases("si1")) - self.failUnlessEqual(len(leases), 2) - self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2])) + # add-lease on a missing storage index is silently ignored + self.failUnlessEqual(aa.remote_add_lease("si18", "", ""), None) + self.failUnlessEqual(len(aa.get_leases("si18")), 0) - # and a third lease, using add-lease - rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()), - hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())) - ss.remote_add_lease("si1", rs2a, cs2a) - leases = list(ss.get_leases("si1")) - self.failUnlessEqual(len(leases), 3) - self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a])) + all_leases = aa.get_leases("si1") - # add-lease on a missing storage index is silently ignored - self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None) - - # check that si0 is readable - readers = ss.remote_get_buckets("si0") - self.failUnlessEqual(len(readers), 5) - - # renew the first lease. Only the proper renew_secret should work - ss.remote_renew_lease("si0", rs0) - self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0) - self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1) - - # check that si0 is still readable - readers = ss.remote_get_buckets("si0") - self.failUnlessEqual(len(readers), 5) - - # There is no such method as remote_cancel_lease for now -- see - # ticket #1528. - self.failIf(hasattr(ss, 'remote_cancel_lease'), \ - "ss should not have a 'remote_cancel_lease' method/attribute") - - # test overlapping uploads - rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()), - hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())) - rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()), - hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())) - already,writers = ss.remote_allocate_buckets("si3", rs3, cs3, - sharenums, size, canary) - self.failUnlessEqual(len(already), 0) - self.failUnlessEqual(len(writers), 5) - already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4, - sharenums, size, canary) - self.failUnlessEqual(len(already2), 0) - self.failUnlessEqual(len(writers2), 0) - for wb in writers.values(): - wb.remote_close() + # renew the lease directly + aa.remote_renew_lease("si1", "") + self.failUnlessEqual(len(aa.get_leases("si1")), 5) + self.compare_leases(all_leases, aa.get_leases("si1"), with_timestamps=False) - leases = list(ss.get_leases("si3")) - self.failUnlessEqual(len(leases), 1) + # Now allocate more leases using a different account. + # A new lease should be allocated for every share in the shareset. + sa.remote_renew_lease("si1", "") + self.failUnlessEqual(len(aa.get_leases("si1")), 5) + self.failUnlessEqual(len(sa.get_leases("si1")), 5) - already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4, - sharenums, size, canary) - self.failUnlessEqual(len(already3), 5) - self.failUnlessEqual(len(writers3), 0) + all_leases2 = sa.get_leases("si1") - leases = list(ss.get_leases("si3")) - self.failUnlessEqual(len(leases), 2) + sa.remote_renew_lease("si1", "") + self.compare_leases(all_leases2, sa.get_leases("si1"), with_timestamps=False) def test_readonly(self): workdir = self.workdir("test_readonly") - ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True) - ss.setServiceParent(self.sparent) + server = StorageServer(workdir, "\x00" * 20, readonly_storage=True) + server.setServiceParent(self.sparent) + aa = server.get_accountant().get_anonymous_account() - already,writers = self.allocate(ss, "vid", [0,1,2], 75) + already,writers = self.allocate(aa, "vid", [0,1,2], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(writers, {}) - stats = ss.get_stats() + stats = server.get_stats() self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0) if "storage_server.disk_avail" in stats: # Some platforms may not have an API to get disk stats. @@ -713,11 +670,12 @@ class Server(unittest.TestCase): def test_advise_corruption(self): workdir = self.workdir("test_advise_corruption") - ss = StorageServer(workdir, "\x00" * 20) - ss.setServiceParent(self.sparent) + server = StorageServer(workdir, "\x00" * 20) + server.setServiceParent(self.sparent) + aa = server.get_accountant().get_anonymous_account() si0_s = base32.b2a("si0") - ss.remote_advise_corrupt_share("immutable", "si0", 0, + aa.remote_advise_corrupt_share("immutable", "si0", 0, "This share smells funny.\n") reportdir = os.path.join(workdir, "corruption-advisories") reports = os.listdir(reportdir) @@ -734,13 +692,13 @@ class Server(unittest.TestCase): # test the RIBucketWriter version too si1_s = base32.b2a("si1") - already,writers = self.allocate(ss, "si1", [1], 75) + already,writers = self.allocate(aa, "si1", [1], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([1])) writers[1].remote_write(0, "data") writers[1].remote_close() - b = ss.remote_get_buckets("si1") + b = aa.remote_get_buckets("si1") self.failUnlessEqual(set(b.keys()), set([1])) b[1].remote_advise_corrupt_share("This share tastes like dust.\n") @@ -788,11 +746,11 @@ class MutableServer(unittest.TestCase): def cancel_secret(self, tag): return hashutil.tagged_hash("cancel_blah", str(tag)) - def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size): + def allocate(self, aa, storage_index, we_tag, lease_tag, sharenums, size): write_enabler = self.write_enabler(we_tag) renew_secret = self.renew_secret(lease_tag) cancel_secret = self.cancel_secret(lease_tag) - rstaraw = ss.remote_slot_testv_and_readv_and_writev + rstaraw = aa.remote_slot_testv_and_readv_and_writev testandwritev = dict( [ (shnum, ([], [], None) ) for shnum in sharenums ] ) readv = [] @@ -807,25 +765,29 @@ class MutableServer(unittest.TestCase): def test_bad_magic(self): - ss = self.create("test_bad_magic") - self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10) - fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0") + server = self.create("test_bad_magic") + aa = server.get_accountant().get_anonymous_account() + + self.allocate(aa, "si1", "we1", self._lease_secret.next(), set([0]), 10) + fn = os.path.join(server.sharedir, storage_index_to_dir("si1"), "0") f = open(fn, "rb+") f.seek(0) f.write("BAD MAGIC") f.close() - read = ss.remote_slot_readv + read = aa.remote_slot_readv e = self.failUnlessRaises(UnknownMutableContainerVersionError, read, "si1", [0], [(0,10)]) self.failUnlessIn(" had magic ", str(e)) self.failUnlessIn(" but we wanted ", str(e)) def test_container_size(self): - ss = self.create("test_container_size") - self.allocate(ss, "si1", "we1", self._lease_secret.next(), + server = self.create("test_container_size") + aa = server.get_accountant().get_anonymous_account() + + self.allocate(aa, "si1", "we1", self._lease_secret.next(), set([0,1,2]), 100) - read = ss.remote_slot_readv - rstaraw = ss.remote_slot_testv_and_readv_and_writev + read = aa.remote_slot_readv + rstaraw = aa.remote_slot_testv_and_readv_and_writev secrets = ( self.write_enabler("we1"), self.renew_secret("we1"), self.cancel_secret("we1") ) @@ -905,7 +867,7 @@ class MutableServer(unittest.TestCase): # Also see if the server explicitly declares that it supports this # feature. - ver = ss.remote_get_version() + ver = aa.remote_get_version() storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"] self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes")) @@ -919,11 +881,13 @@ class MutableServer(unittest.TestCase): self.failUnlessEqual(read_answer, {}) def test_allocate(self): - ss = self.create("test_allocate") - self.allocate(ss, "si1", "we1", self._lease_secret.next(), + server = self.create("test_allocate") + aa = server.get_accountant().get_anonymous_account() + + self.allocate(aa, "si1", "we1", self._lease_secret.next(), set([0,1,2]), 100) - read = ss.remote_slot_readv + read = aa.remote_slot_readv self.failUnlessEqual(read("si1", [0], [(0, 10)]), {0: [""]}) self.failUnlessEqual(read("si1", [], [(0, 10)]), @@ -936,7 +900,7 @@ class MutableServer(unittest.TestCase): self.renew_secret("we1"), self.cancel_secret("we1") ) data = "".join([ ("%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev + write = aa.remote_slot_testv_and_readv_and_writev answer = write("si1", secrets, {0: ([], [(0,data)], None)}, []) @@ -990,14 +954,15 @@ class MutableServer(unittest.TestCase): def test_operators(self): # test operators, the data we're comparing is '11111' in all cases. # test both fail+pass, reset data after each one. - ss = self.create("test_operators") + server = self.create("test_operators") + aa = server.get_accountant().get_anonymous_account() secrets = ( self.write_enabler("we1"), self.renew_secret("we1"), self.cancel_secret("we1") ) data = "".join([ ("%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev - read = ss.remote_slot_readv + write = aa.remote_slot_testv_and_readv_and_writev + read = aa.remote_slot_readv def reset(): write("si1", secrets, @@ -1168,13 +1133,15 @@ class MutableServer(unittest.TestCase): reset() def test_readv(self): - ss = self.create("test_readv") + server = self.create("test_readv") + aa = server.get_accountant().get_anonymous_account() + secrets = ( self.write_enabler("we1"), self.renew_secret("we1"), self.cancel_secret("we1") ) data = "".join([ ("%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev - read = ss.remote_slot_readv + write = aa.remote_slot_testv_and_readv_and_writev + read = aa.remote_slot_readv data = [("%d" % i) * 100 for i in range(3)] rc = write("si1", secrets, {0: ([], [(0,data[0])], None), @@ -1188,127 +1155,100 @@ class MutableServer(unittest.TestCase): 1: ["1"*10], 2: ["2"*10]}) - def compare_leases_without_timestamps(self, leases_a, leases_b): - self.failUnlessEqual(len(leases_a), len(leases_b)) - for i in range(len(leases_a)): - a = leases_a[i] - b = leases_b[i] - self.failUnlessEqual(a.owner_num, b.owner_num) - self.failUnlessEqual(a.renew_secret, b.renew_secret) - self.failUnlessEqual(a.cancel_secret, b.cancel_secret) - self.failUnlessEqual(a.nodeid, b.nodeid) - - def compare_leases(self, leases_a, leases_b): + def compare_leases(self, leases_a, leases_b, with_timestamps=True): self.failUnlessEqual(len(leases_a), len(leases_b)) for i in range(len(leases_a)): a = leases_a[i] b = leases_b[i] - self.failUnlessEqual(a.owner_num, b.owner_num) - self.failUnlessEqual(a.renew_secret, b.renew_secret) - self.failUnlessEqual(a.cancel_secret, b.cancel_secret) - self.failUnlessEqual(a.nodeid, b.nodeid) - self.failUnlessEqual(a.expiration_time, b.expiration_time) + self.failUnlessEqual(a.owner_num, b.owner_num) + if with_timestamps: + self.failUnlessEqual(a.renewal_time, b.renewal_time) + self.failUnlessEqual(a.expiration_time, b.expiration_time) def test_leases(self): - ss = self.create("test_leases") + server = self.create("test_leases") + aa = server.get_accountant().get_anonymous_account() + sa = server.get_accountant().get_starter_account() + def secrets(n): return ( self.write_enabler("we1"), self.renew_secret("we1-%d" % n), self.cancel_secret("we1-%d" % n) ) data = "".join([ ("%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev - read = ss.remote_slot_readv - rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, []) + write = aa.remote_slot_testv_and_readv_and_writev + write2 = sa.remote_slot_testv_and_readv_and_writev + read = aa.remote_slot_readv + rc = write("si0", secrets(0), {0: ([], [(0,data)], None)}, []) self.failUnlessEqual(rc, (True, {})) # create a random non-numeric file in the bucket directory, to # exercise the code that's supposed to ignore those. bucket_dir = os.path.join(self.workdir("test_leases"), - "shares", storage_index_to_dir("si1")) - f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w") - f.write("you ought to be ignoring me\n") - f.close() + "shares", storage_index_to_dir("six")) + os.makedirs(bucket_dir) + fileutil.write(os.path.join(bucket_dir, "ignore_me.txt"), + "you ought to be ignoring me\n") s0 = MutableShareFile(os.path.join(bucket_dir, "0")) - self.failUnlessEqual(len(list(s0.get_leases())), 1) + s0.create("nodeid", secrets(0)[0]) + + aa.add_share("six", 0, 0, SHARETYPE_MUTABLE) + # adding a share does not immediately add a lease + self.failUnlessEqual(len(aa.get_leases("six")), 0) + + aa.add_or_renew_default_lease("six", 0) + self.failUnlessEqual(len(aa.get_leases("six")), 1) # add-lease on a missing storage index is silently ignored - self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None) + self.failUnlessEqual(aa.remote_add_lease("si18", "", ""), None) + self.failUnlessEqual(len(aa.get_leases("si18")), 0) - # re-allocate the slots and use the same secrets, that should update - # the lease + # update the lease by writing write("si1", secrets(0), {0: ([], [(0,data)], None)}, []) - self.failUnlessEqual(len(list(s0.get_leases())), 1) + self.failUnlessEqual(len(aa.get_leases("si1")), 1) # renew it directly - ss.remote_renew_lease("si1", secrets(0)[1]) - self.failUnlessEqual(len(list(s0.get_leases())), 1) - - # now allocate them with a bunch of different secrets, to trigger the - # extended lease code. Use add_lease for one of them. - write("si1", secrets(1), {0: ([], [(0,data)], None)}, []) - self.failUnlessEqual(len(list(s0.get_leases())), 2) - secrets2 = secrets(2) - ss.remote_add_lease("si1", secrets2[1], secrets2[2]) - self.failUnlessEqual(len(list(s0.get_leases())), 3) - write("si1", secrets(3), {0: ([], [(0,data)], None)}, []) - write("si1", secrets(4), {0: ([], [(0,data)], None)}, []) - write("si1", secrets(5), {0: ([], [(0,data)], None)}, []) - - self.failUnlessEqual(len(list(s0.get_leases())), 6) - - all_leases = list(s0.get_leases()) - # and write enough data to expand the container, forcing the server - # to move the leases - write("si1", secrets(0), - {0: ([], [(0,data)], 200), }, - []) - - # read back the leases, make sure they're still intact. - self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) - - ss.remote_renew_lease("si1", secrets(0)[1]) - ss.remote_renew_lease("si1", secrets(1)[1]) - ss.remote_renew_lease("si1", secrets(2)[1]) - ss.remote_renew_lease("si1", secrets(3)[1]) - ss.remote_renew_lease("si1", secrets(4)[1]) - self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) + aa.remote_renew_lease("si1", secrets(0)[1]) + self.failUnlessEqual(len(aa.get_leases("si1")), 1) + + # now allocate another lease using a different account + write2("si1", secrets(1), {0: ([], [(0,data)], None)}, []) + self.failUnlessEqual(len(aa.get_leases("si1")), 1) + self.failUnlessEqual(len(sa.get_leases("si1")), 1) + + aa_leases = aa.get_leases("si1") + sa_leases = sa.get_leases("si1") + + aa.remote_renew_lease("si1", secrets(0)[1]) + self.compare_leases(aa_leases, aa.get_leases("si1"), with_timestamps=False) + + sa.remote_renew_lease("si1", secrets(1)[1]) + self.compare_leases(sa_leases, sa.get_leases("si1"), with_timestamps=False) + # get a new copy of the leases, with the current timestamps. Reading - # data and failing to renew/cancel leases should leave the timestamps - # alone. - all_leases = list(s0.get_leases()) - # renewing with a bogus token should prompt an error message - - # examine the exception thus raised, make sure the old nodeid is - # present, to provide for share migration - e = self.failUnlessRaises(IndexError, - ss.remote_renew_lease, "si1", - secrets(20)[1]) - e_s = str(e) - self.failUnlessIn("Unable to renew non-existent lease", e_s) - self.failUnlessIn("I have leases accepted by nodeids:", e_s) - self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s) - - self.compare_leases(all_leases, list(s0.get_leases())) + # data should leave the timestamps alone. + aa_leases = aa.get_leases("si1") # reading shares should not modify the timestamp read("si1", [], [(0,200)]) - self.compare_leases(all_leases, list(s0.get_leases())) + self.compare_leases(aa_leases, aa.get_leases("si1")) write("si1", secrets(0), {0: ([], [(200, "make me bigger")], None)}, []) - self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) + self.compare_leases(aa_leases, aa.get_leases("si1"), with_timestamps=False) write("si1", secrets(0), {0: ([], [(500, "make me really bigger")], None)}, []) - self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) + self.compare_leases(aa_leases, aa.get_leases("si1"), with_timestamps=False) def test_remove(self): - ss = self.create("test_remove") - self.allocate(ss, "si1", "we1", self._lease_secret.next(), + server = self.create("test_remove") + aa = server.get_accountant().get_anonymous_account() + + self.allocate(aa, "si1", "we1", self._lease_secret.next(), set([0,1,2]), 100) - readv = ss.remote_slot_readv - writev = ss.remote_slot_testv_and_readv_and_writev + readv = aa.remote_slot_readv + writev = aa.remote_slot_testv_and_readv_and_writev secrets = ( self.write_enabler("we1"), self.renew_secret("we1"), self.cancel_secret("we1") ) @@ -1353,9 +1293,9 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def setUp(self): self.sparent = LoggingServiceParent() self._lease_secret = itertools.count() - self.ss = self.create("MDMFProxies storage test server") + self.aa = self.create("MDMFProxies storage test server") self.rref = RemoteBucket() - self.rref.target = self.ss + self.rref.target = self.aa self.secrets = (self.write_enabler("we_secret"), self.renew_secret("renew_secret"), self.cancel_secret("cancel_secret")) @@ -1401,7 +1341,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): workdir = self.workdir(name) server = StorageServer(workdir, "\x00" * 20) server.setServiceParent(self.sparent) - return server + return server.get_accountant().get_anonymous_account() def build_test_mdmf_share(self, tail_segment=False, empty=False): # Start with the checkstring @@ -1505,12 +1445,12 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): tail_segment=False, empty=False): """ - I write some data for the read tests to read to self.ss + I write some data for the read tests to read to self.aa If tail_segment=True, then I will write a share that has a smaller tail segment than other segments. """ - write = self.ss.remote_slot_testv_and_readv_and_writev + write = self.aa.remote_slot_testv_and_readv_and_writev data = self.build_test_mdmf_share(tail_segment, empty) # Finally, we write the whole thing to the storage server in one # pass. @@ -1521,7 +1461,6 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): results = write(storage_index, self.secrets, tws, readv) self.failUnless(results[0]) - def build_test_sdmf_share(self, empty=False): if empty: sharedata = "" @@ -1578,7 +1517,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # Some tests need SDMF shares to verify that we can still # read them. This method writes one, which resembles but is not assert self.rref - write = self.ss.remote_slot_testv_and_readv_and_writev + write = self.aa.remote_slot_testv_and_readv_and_writev share = self.build_test_sdmf_share(empty) testvs = [(0, 1, "eq", "")] tws = {} @@ -1894,7 +1833,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # blocks. mw = self._make_new_mw("si1", 0) # Test writing some blocks. - read = self.ss.remote_slot_readv + read = self.aa.remote_slot_readv expected_private_key_offset = struct.calcsize(MDMFHEADER) expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \ PRIVATE_KEY_SIZE + \ @@ -2666,7 +2605,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): d = sdmfr.finish_publishing() def _then(ignored): self.failUnlessEqual(self.rref.write_count, 1) - read = self.ss.remote_slot_readv + read = self.aa.remote_slot_readv self.failUnlessEqual(read("si1", [0], [(0, len(data))]), {0: [data]}) d.addCallback(_then) @@ -2722,7 +2661,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): sdmfw.finish_publishing()) def _then_again(results): self.failUnless(results[0]) - read = self.ss.remote_slot_readv + read = self.aa.remote_slot_readv self.failUnlessEqual(read("si1", [0], [(1, 8)]), {0: [struct.pack(">Q", 1)]}) self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]), @@ -2963,38 +2902,8 @@ class BucketCounterTest(unittest.TestCase, CrawlerTestMixin, ReallyEqualMixin): d.addBoth(self._wait_for_yield, bucket_counter) return d -class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler): - stop_after_first_bucket = False - def process_bucket(self, *args, **kwargs): - LeaseCheckingCrawler.process_bucket(self, *args, **kwargs) - if self.stop_after_first_bucket: - self.stop_after_first_bucket = False - self.cpu_slice = -1.0 - def yielding(self, sleep_time): - if not self.stop_after_first_bucket: - self.cpu_slice = 500 - -class BrokenStatResults: - pass -class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler): - def stat(self, fn): - s = os.stat(fn) - bsr = BrokenStatResults() - for attrname in dir(s): - if attrname.startswith("_"): - continue - if attrname == "st_blocks": - continue - setattr(bsr, attrname, getattr(s, attrname)) - return bsr - -class InstrumentedStorageServer(StorageServer): - LeaseCheckerClass = InstrumentedLeaseCheckingCrawler -class No_ST_BLOCKS_StorageServer(StorageServer): - LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler - -class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): +class AccountingCrawlerTest(unittest.TestCase, CrawlerTestMixin, WebRenderingMixin, ReallyEqualMixin): def setUp(self): self.s = service.MultiService() self.s.startService() @@ -3003,7 +2912,10 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): return self.s.stopService() - def make_shares(self, ss): + def make_shares(self, server): + aa = server.get_accountant().get_anonymous_account() + sa = server.get_accountant().get_starter_account() + def make(si): return (si, hashutil.tagged_hash("renew", si), hashutil.tagged_hash("cancel", si)) @@ -3027,37 +2939,41 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): # inner contents are not a valid CHK share data = "\xff" * 1000 - a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums, + a,w = aa.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums, 1000, canary) w[0].remote_write(0, data) w[0].remote_close() - a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums, + a,w = aa.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums, 1000, canary) w[0].remote_write(0, data) w[0].remote_close() - ss.remote_add_lease(immutable_si_1, rs1a, cs1a) + sa.remote_add_lease(immutable_si_1, rs1a, cs1a) - writev = ss.remote_slot_testv_and_readv_and_writev + writev = aa.remote_slot_testv_and_readv_and_writev writev(mutable_si_2, (we2, rs2, cs2), {0: ([], [(0,data)], len(data))}, []) writev(mutable_si_3, (we3, rs3, cs3), {0: ([], [(0,data)], len(data))}, []) - ss.remote_add_lease(mutable_si_3, rs3a, cs3a) + sa.remote_add_lease(mutable_si_3, rs3a, cs3a) self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a] self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a] - def BROKEN_test_basic(self): - basedir = "storage/LeaseCrawler/basic" + def test_basic(self): + basedir = "storage/AccountingCrawler/basic" fileutil.make_dirs(basedir) - server = InstrumentedStorageServer(basedir, "\x00" * 20) - # make it start sooner than usual. - lc = server.lease_checker - lc.slow_start = 0 - lc.cpu_slice = 500 - lc.stop_after_first_bucket = True + ep = ExpirationPolicy(enabled=False) + server = StorageServer(basedir, "\x00" * 20, expiration_policy=ep) + aa = server.get_accountant().get_anonymous_account() + sa = server.get_accountant().get_starter_account() + + # finish as fast as possible + ac = server.get_accounting_crawler() + ac.slow_start = 0 + ac.cpu_slice = 500 + webstatus = StorageStatus(server) # create a few shares, with some leases on them @@ -3068,13 +2984,11 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): fn = os.path.join(server.sharedir, storage_index_to_dir(immutable_si_0), "not-a-share") - f = open(fn, "wb") - f.write("I am not a share.\n") - f.close() + fileutil.write(fn, "I am not a share.\n") # this is before the crawl has started, so we're not in a cycle yet - initial_state = lc.get_state() - self.failIf(lc.get_progress()["cycle-in-progress"]) + initial_state = ac.get_state() + self.failIf(ac.get_progress()["cycle-in-progress"]) self.failIfIn("cycle-to-date", initial_state) self.failIfIn("estimated-remaining-cycle", initial_state) self.failIfIn("estimated-current-cycle", initial_state) @@ -3085,23 +2999,16 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): DAY = 24*60*60 - d = fireEventually() + # now examine the state right after the 'aa' prefix has been processed. + d = self._after_prefix(None, 'aa', ac) + def _after_aa_prefix(state): + self.failUnlessIn("cycle-to-date", state) + self.failUnlessIn("estimated-remaining-cycle", state) + self.failUnlessIn("estimated-current-cycle", state) + self.failUnlessIn("history", state) + self.failUnlessEqual(state["history"], {}) - # now examine the state right after the first bucket has been - # processed. - def _after_first_bucket(ignored): - initial_state = lc.get_state() - if "cycle-to-date" not in initial_state: - d2 = fireEventually() - d2.addCallback(_after_first_bucket) - return d2 - self.failUnlessIn("cycle-to-date", initial_state) - self.failUnlessIn("estimated-remaining-cycle", initial_state) - self.failUnlessIn("estimated-current-cycle", initial_state) - self.failUnlessIn("history", initial_state) - self.failUnlessEqual(initial_state["history"], {}) - - so_far = initial_state["cycle-to-date"] + so_far = state["cycle-to-date"] self.failUnlessEqual(so_far["expiration-enabled"], False) self.failUnlessIn("configured-expiration-mode", so_far) self.failUnlessIn("lease-age-histogram", so_far) @@ -3109,20 +3016,18 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): self.failUnlessEqual(type(lah), list) self.failUnlessEqual(len(lah), 1) self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] ) - self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1}) self.failUnlessEqual(so_far["corrupt-shares"], []) sr1 = so_far["space-recovered"] self.failUnlessEqual(sr1["examined-buckets"], 1) self.failUnlessEqual(sr1["examined-shares"], 1) self.failUnlessEqual(sr1["actual-shares"], 0) - left = initial_state["estimated-remaining-cycle"] + left = state["estimated-remaining-cycle"] sr2 = left["space-recovered"] self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"]) self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"]) self.failIfEqual(sr2["actual-shares"], None) - self.failIfEqual(sr2["configured-diskbytes"], None) - self.failIfEqual(sr2["original-sharebytes"], None) - d.addCallback(_after_first_bucket) + d.addCallback(_after_aa_prefix) + d.addCallback(lambda ign: self.render1(webstatus)) def _check_html_in_cycle(html): s = remove_tags(html) @@ -3131,22 +3036,24 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): self.failUnlessIn("and has recovered: " "0 shares, 0 buckets (0 mutable / 0 immutable), " "0 B (0 B / 0 B)", s) + + return ac.set_hook('after_cycle') d.addCallback(_check_html_in_cycle) - # wait for the crawler to finish the first cycle. Nothing should have - # been removed. - def _wait(): - return bool(lc.get_state()["last-cycle-finished"] is not None) - d.addCallback(lambda ign: self.poll(_wait)) + def _after_first_cycle(cycle): + # After the first cycle, nothing should have been removed. + self.failUnlessEqual(cycle, 0) + progress = ac.get_progress() + self.failUnlessReallyEqual(progress["cycle-in-progress"], False) - def _after_first_cycle(ignored): - s = lc.get_state() + s = ac.get_state() self.failIf("cycle-to-date" in s) self.failIf("estimated-remaining-cycle" in s) self.failIf("estimated-current-cycle" in s) last = s["history"][0] + self.failUnlessEqual(type(last), dict, repr(last)) self.failUnlessIn("cycle-start-finish-times", last) - self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple) + self.failUnlessEqual(type(last["cycle-start-finish-times"]), list, repr(last)) self.failUnlessEqual(last["expiration-enabled"], False) self.failUnlessIn("configured-expiration-mode", last) @@ -3154,9 +3061,8 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): lah = last["lease-age-histogram"] self.failUnlessEqual(type(lah), list) self.failUnlessEqual(len(lah), 1) - self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] ) + self.failUnlessEqual(tuple(lah[0]), (0.0, DAY, 6) ) - self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2}) self.failUnlessEqual(last["corrupt-shares"], []) rec = last["space-recovered"] @@ -3166,58 +3072,49 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): self.failUnlessEqual(rec["actual-shares"], 0) self.failUnlessEqual(rec["actual-diskbytes"], 0) - def _get_sharefile(si): - return list(server._iter_share_files(si))[0] def count_leases(si): - return len(list(_get_sharefile(si).get_leases())) - self.failUnlessEqual(count_leases(immutable_si_0), 1) - self.failUnlessEqual(count_leases(immutable_si_1), 2) - self.failUnlessEqual(count_leases(mutable_si_2), 1) - self.failUnlessEqual(count_leases(mutable_si_3), 2) + return (len(aa.get_leases(si)), len(sa.get_leases(si))) + self.failUnlessEqual(count_leases(immutable_si_0), (1, 0)) + self.failUnlessEqual(count_leases(immutable_si_1), (1, 1)) + self.failUnlessEqual(count_leases(mutable_si_2), (1, 0)) + self.failUnlessEqual(count_leases(mutable_si_3), (1, 1)) d.addCallback(_after_first_cycle) + d.addCallback(lambda ign: self.render1(webstatus)) - def _check_html(html): + def _check_html_after_cycle(html): s = remove_tags(html) self.failUnlessIn("recovered: 0 shares, 0 buckets " "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s) self.failUnlessIn("and saw a total of 4 shares, 4 buckets " "(2 mutable / 2 immutable),", s) self.failUnlessIn("but expiration was not enabled", s) - d.addCallback(_check_html) + d.addCallback(_check_html_after_cycle) + d.addCallback(lambda ign: self.render_json(webstatus)) - def _check_json(json): + def _check_json_after_cycle(json): data = simplejson.loads(json) self.failUnlessIn("lease-checker", data) self.failUnlessIn("lease-checker-progress", data) - d.addCallback(_check_json) + d.addCallback(_check_json_after_cycle) + d.addBoth(self._wait_for_yield, ac) return d - def backdate_lease(self, sf, renew_secret, new_expire_time): - # ShareFile.renew_lease ignores attempts to back-date a lease (i.e. - # "renew" a lease with a new_expire_time that is older than what the - # current lease has), so we have to reach inside it. - for i,lease in enumerate(sf.get_leases()): - if lease.renew_secret == renew_secret: - lease.expiration_time = new_expire_time - f = open(sf.home, 'rb+') - sf._write_lease_record(f, i, lease) - f.close() - return - raise IndexError("unable to renew non-existent lease") - - def BROKEN_test_expire_age(self): - basedir = "storage/LeaseCrawler/expire_age" + def test_expire_age(self): + basedir = "storage/AccountingCrawler/expire_age" fileutil.make_dirs(basedir) # setting expiration_time to 2000 means that any lease which is more # than 2000s old will be expired. - server = InstrumentedStorageServer(basedir, "\x00" * 20, - expiration_enabled=True, - expiration_mode="age", - expiration_override_lease_duration=2000) - # make it start sooner than usual. - lc = server.lease_checker - lc.slow_start = 0 - lc.stop_after_first_bucket = True + now = time.time() + ep = ExpirationPolicy(enabled=True, mode="age", override_lease_duration=2000) + server = StorageServer(basedir, "\x00" * 20, expiration_policy=ep) + aa = server.get_accountant().get_anonymous_account() + sa = server.get_accountant().get_starter_account() + + # finish as fast as possible + ac = server.get_accounting_crawler() + ac.slow_start = 0 + ac.cpu_slice = 500 + webstatus = StorageStatus(server) # create a few shares, with some leases on them @@ -3229,54 +3126,45 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): def _get_sharefile(si): return list(server._iter_share_files(si))[0] def count_leases(si): - return len(list(_get_sharefile(si).get_leases())) + return (len(aa.get_leases(si)), len(sa.get_leases(si))) self.failUnlessEqual(count_shares(immutable_si_0), 1) - self.failUnlessEqual(count_leases(immutable_si_0), 1) + self.failUnlessEqual(count_leases(immutable_si_0), (1, 0)) self.failUnlessEqual(count_shares(immutable_si_1), 1) - self.failUnlessEqual(count_leases(immutable_si_1), 2) + self.failUnlessEqual(count_leases(immutable_si_1), (1, 1)) self.failUnlessEqual(count_shares(mutable_si_2), 1) - self.failUnlessEqual(count_leases(mutable_si_2), 1) + self.failUnlessEqual(count_leases(mutable_si_2), (1, 0)) self.failUnlessEqual(count_shares(mutable_si_3), 1) - self.failUnlessEqual(count_leases(mutable_si_3), 2) + self.failUnlessEqual(count_leases(mutable_si_3), (1, 1)) + + # artificially crank back the renewal time on the first lease of each + # share to 3000s ago, and set the expiration time to 31 days later. + new_renewal_time = now - 3000 + new_expiration_time = new_renewal_time + 31*24*60*60 - # artificially crank back the expiration time on the first lease of - # each share, to make it look like it expired already (age=1000s). # Some shares have an extra lease which is set to expire at the # default time in 31 days from now (age=31days). We then run the # crawler, which will expire the first lease, making some shares get # deleted and others stay alive (with one remaining lease) - now = time.time() - sf0 = _get_sharefile(immutable_si_0) - self.backdate_lease(sf0, self.renew_secrets[0], now - 1000) + aa.add_or_renew_lease(immutable_si_0, 0, new_renewal_time, new_expiration_time) # immutable_si_1 gets an extra lease - sf1 = _get_sharefile(immutable_si_1) - self.backdate_lease(sf1, self.renew_secrets[1], now - 1000) + sa.add_or_renew_lease(immutable_si_1, 0, new_renewal_time, new_expiration_time) - sf2 = _get_sharefile(mutable_si_2) - self.backdate_lease(sf2, self.renew_secrets[3], now - 1000) + aa.add_or_renew_lease(mutable_si_2, 0, new_renewal_time, new_expiration_time) # mutable_si_3 gets an extra lease - sf3 = _get_sharefile(mutable_si_3) - self.backdate_lease(sf3, self.renew_secrets[4], now - 1000) + sa.add_or_renew_lease(mutable_si_3, 0, new_renewal_time, new_expiration_time) server.setServiceParent(self.s) - d = fireEventually() - # examine the state right after the first bucket has been processed - def _after_first_bucket(ignored): - p = lc.get_progress() - if not p["cycle-in-progress"]: - d2 = fireEventually() - d2.addCallback(_after_first_bucket) - return d2 - d.addCallback(_after_first_bucket) + # now examine the web status right after the 'aa' prefix has been processed. + d = self._after_prefix(None, 'aa', ac) d.addCallback(lambda ign: self.render1(webstatus)) def _check_html_in_cycle(html): s = remove_tags(html) - # the first bucket encountered gets deleted, and its prefix + # the first shareset encountered gets deleted, and its prefix # happens to be about 1/5th of the way through the ring, so the # predictor thinks we'll have 5 shares and that we'll delete them # all. This part of the test depends upon the SIs landing right @@ -3286,29 +3174,28 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): self.failUnlessIn("The whole cycle is expected to examine " "5 shares in 5 buckets and to recover: " "5 shares, 5 buckets", s) - d.addCallback(_check_html_in_cycle) - # wait for the crawler to finish the first cycle. Two shares should - # have been removed - def _wait(): - return bool(lc.get_state()["last-cycle-finished"] is not None) - d.addCallback(lambda ign: self.poll(_wait)) + return ac.set_hook('after_cycle') + d.addCallback(_check_html_in_cycle) def _after_first_cycle(ignored): self.failUnlessEqual(count_shares(immutable_si_0), 0) self.failUnlessEqual(count_shares(immutable_si_1), 1) - self.failUnlessEqual(count_leases(immutable_si_1), 1) + self.failUnlessEqual(count_leases(immutable_si_1), (1, 0)) self.failUnlessEqual(count_shares(mutable_si_2), 0) self.failUnlessEqual(count_shares(mutable_si_3), 1) - self.failUnlessEqual(count_leases(mutable_si_3), 1) + self.failUnlessEqual(count_leases(mutable_si_3), (1, 0)) - s = lc.get_state() + s = ac.get_state() last = s["history"][0] self.failUnlessEqual(last["expiration-enabled"], True) - self.failUnlessEqual(last["configured-expiration-mode"], - ("age", 2000, None, ("mutable", "immutable"))) - self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2}) + cem = last["configured-expiration-mode"] + self.failUnlessEqual(cem[0], "age") + self.failUnlessEqual(cem[1], 2000) + self.failUnlessEqual(cem[2], None) + self.failUnlessEqual(cem[3][0], "mutable") + self.failUnlessEqual(cem[3][1], "immutable") rec = last["space-recovered"] self.failUnlessEqual(rec["examined-buckets"], 4) @@ -3320,30 +3207,34 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): self.failUnless(rec["actual-diskbytes"] >= 0, rec["actual-diskbytes"]) d.addCallback(_after_first_cycle) + d.addCallback(lambda ign: self.render1(webstatus)) - def _check_html(html): + def _check_html_after_cycle(html): s = remove_tags(html) self.failUnlessIn("Expiration Enabled: expired leases will be removed", s) self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s) self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s) - d.addCallback(_check_html) + d.addCallback(_check_html_after_cycle) + d.addBoth(self._wait_for_yield, ac) return d - def BROKEN_test_expire_cutoff_date(self): - basedir = "storage/LeaseCrawler/expire_cutoff_date" + def test_expire_cutoff_date(self): + basedir = "storage/AccountingCrawler/expire_cutoff_date" fileutil.make_dirs(basedir) # setting cutoff-date to 2000 seconds ago means that any lease which # is more than 2000s old will be expired. now = time.time() then = int(now - 2000) - server = InstrumentedStorageServer(basedir, "\x00" * 20, - expiration_enabled=True, - expiration_mode="cutoff-date", - expiration_cutoff_date=then) - # make it start sooner than usual. - lc = server.lease_checker - lc.slow_start = 0 - lc.stop_after_first_bucket = True + ep = ExpirationPolicy(enabled=True, mode="cutoff-date", cutoff_date=then) + server = StorageServer(basedir, "\x00" * 20, expiration_policy=ep) + aa = server.get_accountant().get_anonymous_account() + sa = server.get_accountant().get_starter_account() + + # finish as fast as possible + ac = server.get_accounting_crawler() + ac.slow_start = 0 + ac.cpu_slice = 500 + webstatus = StorageStatus(server) # create a few shares, with some leases on them @@ -3355,54 +3246,41 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): def _get_sharefile(si): return list(server._iter_share_files(si))[0] def count_leases(si): - return len(list(_get_sharefile(si).get_leases())) + return (len(aa.get_leases(si)), len(sa.get_leases(si))) self.failUnlessEqual(count_shares(immutable_si_0), 1) - self.failUnlessEqual(count_leases(immutable_si_0), 1) + self.failUnlessEqual(count_leases(immutable_si_0), (1, 0)) self.failUnlessEqual(count_shares(immutable_si_1), 1) - self.failUnlessEqual(count_leases(immutable_si_1), 2) + self.failUnlessEqual(count_leases(immutable_si_1), (1, 1)) self.failUnlessEqual(count_shares(mutable_si_2), 1) - self.failUnlessEqual(count_leases(mutable_si_2), 1) + self.failUnlessEqual(count_leases(mutable_si_2), (1, 0)) self.failUnlessEqual(count_shares(mutable_si_3), 1) - self.failUnlessEqual(count_leases(mutable_si_3), 2) + self.failUnlessEqual(count_leases(mutable_si_3), (1, 1)) - # artificially crank back the expiration time on the first lease of - # each share, to make it look like was renewed 3000s ago. To achieve - # this, we need to set the expiration time to now-3000+31days. This - # will change when the lease format is improved to contain both - # create/renew time and duration. - new_expiration_time = now - 3000 + 31*24*60*60 + # artificially crank back the renewal time on the first lease of each + # share to 3000s ago, and set the expiration time to 31 days later. + new_renewal_time = now - 3000 + new_expiration_time = new_renewal_time + 31*24*60*60 # Some shares have an extra lease which is set to expire at the # default time in 31 days from now (age=31days). We then run the # crawler, which will expire the first lease, making some shares get # deleted and others stay alive (with one remaining lease) - sf0 = _get_sharefile(immutable_si_0) - self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time) + aa.add_or_renew_lease(immutable_si_0, 0, new_renewal_time, new_expiration_time) # immutable_si_1 gets an extra lease - sf1 = _get_sharefile(immutable_si_1) - self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time) + sa.add_or_renew_lease(immutable_si_1, 0, new_renewal_time, new_expiration_time) - sf2 = _get_sharefile(mutable_si_2) - self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time) + aa.add_or_renew_lease(mutable_si_2, 0, new_renewal_time, new_expiration_time) # mutable_si_3 gets an extra lease - sf3 = _get_sharefile(mutable_si_3) - self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time) + sa.add_or_renew_lease(mutable_si_3, 0, new_renewal_time, new_expiration_time) server.setServiceParent(self.s) - d = fireEventually() - # examine the state right after the first bucket has been processed - def _after_first_bucket(ignored): - p = lc.get_progress() - if not p["cycle-in-progress"]: - d2 = fireEventually() - d2.addCallback(_after_first_bucket) - return d2 - d.addCallback(_after_first_bucket) + # now examine the web status right after the 'aa' prefix has been processed. + d = self._after_prefix(None, 'aa', ac) d.addCallback(lambda ign: self.render1(webstatus)) def _check_html_in_cycle(html): s = remove_tags(html) @@ -3416,31 +3294,28 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): self.failUnlessIn("The whole cycle is expected to examine " "5 shares in 5 buckets and to recover: " "5 shares, 5 buckets", s) - d.addCallback(_check_html_in_cycle) - # wait for the crawler to finish the first cycle. Two shares should - # have been removed - def _wait(): - return bool(lc.get_state()["last-cycle-finished"] is not None) - d.addCallback(lambda ign: self.poll(_wait)) + return ac.set_hook('after_cycle') + d.addCallback(_check_html_in_cycle) def _after_first_cycle(ignored): self.failUnlessEqual(count_shares(immutable_si_0), 0) self.failUnlessEqual(count_shares(immutable_si_1), 1) - self.failUnlessEqual(count_leases(immutable_si_1), 1) + self.failUnlessEqual(count_leases(immutable_si_1), (1, 0)) self.failUnlessEqual(count_shares(mutable_si_2), 0) self.failUnlessEqual(count_shares(mutable_si_3), 1) - self.failUnlessEqual(count_leases(mutable_si_3), 1) + self.failUnlessEqual(count_leases(mutable_si_3), (1, 0)) - s = lc.get_state() + s = ac.get_state() last = s["history"][0] self.failUnlessEqual(last["expiration-enabled"], True) - self.failUnlessEqual(last["configured-expiration-mode"], - ("cutoff-date", None, then, - ("mutable", "immutable"))) - self.failUnlessEqual(last["leases-per-share-histogram"], - {1: 2, 2: 2}) + cem = last["configured-expiration-mode"] + self.failUnlessEqual(cem[0], "cutoff-date") + self.failUnlessEqual(cem[1], None) + self.failUnlessEqual(cem[2], then) + self.failUnlessEqual(cem[3][0], "mutable") + self.failUnlessEqual(cem[3][1], "immutable") rec = last["space-recovered"] self.failUnlessEqual(rec["examined-buckets"], 4) @@ -3452,8 +3327,9 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): self.failUnless(rec["actual-diskbytes"] >= 0, rec["actual-diskbytes"]) d.addCallback(_after_first_cycle) + d.addCallback(lambda ign: self.render1(webstatus)) - def _check_html(html): + def _check_html_after_cycle(html): s = remove_tags(html) self.failUnlessIn("Expiration Enabled:" " expired leases will be removed", s) @@ -3461,16 +3337,13 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): substr = "Leases created or last renewed before %s will be considered expired." % date self.failUnlessIn(substr, s) self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s) - d.addCallback(_check_html) + d.addCallback(_check_html_after_cycle) + d.addBoth(self._wait_for_yield, ac) return d - def test_bad_mode(self): - basedir = "storage/LeaseCrawler/bad_mode" - fileutil.make_dirs(basedir) - e = self.failUnlessRaises(ValueError, - StorageServer, basedir, "\x00" * 20, - expiration_mode="bogus") + e = self.failUnlessRaises(AssertionError, + ExpirationPolicy, enabled=True, mode="bogus") self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e)) def test_parse_duration(self): @@ -3492,46 +3365,50 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18")) self.failUnlessEqual(p("2009-03-18"), 1237334400) - def BROKEN_test_limited_history(self): - basedir = "storage/LeaseCrawler/limited_history" + def test_limited_history(self): + basedir = "storage/AccountingCrawler/limited_history" fileutil.make_dirs(basedir) server = StorageServer(basedir, "\x00" * 20) - # make it start sooner than usual. - lc = server.lease_checker - lc.slow_start = 0 - lc.cpu_slice = 500 - # create a few shares, with some leases on them + # finish as fast as possible + RETAINED = 2 + CYCLES = 4 + ac = server.get_accounting_crawler() + ac._leasedb.retained_history_entries = RETAINED + ac.slow_start = 0 + ac.cpu_slice = 500 + ac.allowed_cpu_proportion = 1.0 + ac.minimum_cycle_time = 0 + # create a few shares, with some leases on them self.make_shares(server) server.setServiceParent(self.s) - def _wait_until_15_cycles_done(): - last = lc.state["last-cycle-finished"] - if last is not None and last >= 15: - return True - if lc.timer: - lc.timer.reset(0) - return False - d = self.poll(_wait_until_15_cycles_done) - def _check(ignored): - s = lc.get_state() - h = s["history"] - self.failUnlessEqual(len(h), 10) - self.failUnlessEqual(max(h.keys()), 15) - self.failUnlessEqual(min(h.keys()), 6) - d.addCallback(_check) + d = ac.set_hook('after_cycle') + def _after_cycle(cycle): + if cycle < CYCLES: + return ac.set_hook('after_cycle').addCallback(_after_cycle) + + state = ac.get_state() + self.failUnlessIn("history", state) + h = state["history"] + self.failUnlessEqual(len(h), RETAINED) + self.failUnlessEqual(max(h.keys()), CYCLES) + self.failUnlessEqual(min(h.keys()), CYCLES-RETAINED+1) + d.addCallback(_after_cycle) + d.addBoth(self._wait_for_yield, ac) return d - def BROKEN_test_unpredictable_future(self): - basedir = "storage/LeaseCrawler/unpredictable_future" + def OFF_test_unpredictable_future(self): + basedir = "storage/AccountingCrawler/unpredictable_future" fileutil.make_dirs(basedir) server = StorageServer(basedir, "\x00" * 20) + # make it start sooner than usual. - lc = server.lease_checker - lc.slow_start = 0 - lc.cpu_slice = -1.0 # stop quickly + ac = server.get_accounting_crawler() + ac.slow_start = 0 + ac.cpu_slice = -1.0 # stop quickly self.make_shares(server) @@ -3547,11 +3424,9 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): # progress-measurer gets smart enough to count buckets (we'll # have to interrupt it even earlier, before it's finished the # first bucket). - s = lc.get_state() + s = ac.get_state() if "cycle-to-date" not in s: - d2 = fireEventually() - d2.addCallback(_check) - return d2 + return reactor.callLater(0.2, _check) self.failUnlessIn("cycle-to-date", s) self.failUnlessIn("estimated-remaining-cycle", s) self.failUnlessIn("estimated-current-cycle", s) @@ -3565,154 +3440,16 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): self.failUnlessEqual(full["actual-buckets"], None) self.failUnlessEqual(full["actual-shares"], None) self.failUnlessEqual(full["actual-diskbytes"], None) - return d - - def BROKEN_test_no_st_blocks(self): - basedir = "storage/LeaseCrawler/no_st_blocks" - fileutil.make_dirs(basedir) - ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20, - expiration_mode="age", - expiration_override_lease_duration=-1000) - # a negative expiration_time= means the "configured-" - # space-recovered counts will be non-zero, since all shares will have - # expired by then - - # make it start sooner than usual. - lc = ss.lease_checker - lc.slow_start = 0 - self.make_shares(ss) - ss.setServiceParent(self.s) - def _wait(): - return bool(lc.get_state()["last-cycle-finished"] is not None) - d = self.poll(_wait) - - def _check(ignored): - s = lc.get_state() - last = s["history"][0] - rec = last["space-recovered"] - self.failUnlessEqual(rec["configured-buckets"], 4) - self.failUnlessEqual(rec["configured-shares"], 4) - self.failUnless(rec["configured-sharebytes"] > 0, - rec["configured-sharebytes"]) - # without the .st_blocks field in os.stat() results, we should be - # reporting diskbytes==sharebytes - self.failUnlessEqual(rec["configured-sharebytes"], - rec["configured-diskbytes"]) d.addCallback(_check) return d - def BROKEN_test_share_corruption(self): - self._poll_should_ignore_these_errors = [ - UnknownMutableContainerVersionError, - UnknownImmutableContainerVersionError, - ] - basedir = "storage/LeaseCrawler/share_corruption" - fileutil.make_dirs(basedir) - ss = InstrumentedStorageServer(basedir, "\x00" * 20) - w = StorageStatus(ss) - # make it start sooner than usual. - lc = ss.lease_checker - lc.stop_after_first_bucket = True - lc.slow_start = 0 - lc.cpu_slice = 500 - - # create a few shares, with some leases on them - self.make_shares(ss) - - # now corrupt one, and make sure the lease-checker keeps going - [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis - first = min(self.sis) - first_b32 = base32.b2a(first) - fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0") - f = open(fn, "rb+") - f.seek(0) - f.write("BAD MAGIC") - f.close() - # if get_share_file() doesn't see the correct mutable magic, it - # assumes the file is an immutable share, and then - # immutable.ShareFile sees a bad version. So regardless of which kind - # of share we corrupted, this will trigger an - # UnknownImmutableContainerVersionError. - - # also create an empty bucket - empty_si = base32.b2a("\x04"*16) - empty_bucket_dir = os.path.join(ss.sharedir, - storage_index_to_dir(empty_si)) - fileutil.make_dirs(empty_bucket_dir) - - ss.setServiceParent(self.s) - - d = fireEventually() - - # now examine the state right after the first bucket has been - # processed. - def _after_first_bucket(ignored): - s = lc.get_state() - if "cycle-to-date" not in s: - d2 = fireEventually() - d2.addCallback(_after_first_bucket) - return d2 - so_far = s["cycle-to-date"] - rec = so_far["space-recovered"] - self.failUnlessEqual(rec["examined-buckets"], 1) - self.failUnlessEqual(rec["examined-shares"], 0) - self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)]) - d.addCallback(_after_first_bucket) - - d.addCallback(lambda ign: self.render_json(w)) - def _check_json(json): - data = simplejson.loads(json) - # grr. json turns all dict keys into strings. - so_far = data["lease-checker"]["cycle-to-date"] - corrupt_shares = so_far["corrupt-shares"] - # it also turns all tuples into lists - self.failUnlessEqual(corrupt_shares, [[first_b32, 0]]) - d.addCallback(_check_json) - d.addCallback(lambda ign: self.render1(w)) - def _check_html(html): - s = remove_tags(html) - self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s) - d.addCallback(_check_html) - - def _wait(): - return bool(lc.get_state()["last-cycle-finished"] is not None) - d.addCallback(lambda ign: self.poll(_wait)) - - def _after_first_cycle(ignored): - s = lc.get_state() - last = s["history"][0] - rec = last["space-recovered"] - self.failUnlessEqual(rec["examined-buckets"], 5) - self.failUnlessEqual(rec["examined-shares"], 3) - self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)]) - d.addCallback(_after_first_cycle) - d.addCallback(lambda ign: self.render_json(w)) - def _check_json_history(json): - data = simplejson.loads(json) - last = data["lease-checker"]["history"]["0"] - corrupt_shares = last["corrupt-shares"] - self.failUnlessEqual(corrupt_shares, [[first_b32, 0]]) - d.addCallback(_check_json_history) - d.addCallback(lambda ign: self.render1(w)) - def _check_html_history(html): - s = remove_tags(html) - self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s) - d.addCallback(_check_html_history) - - def _cleanup(res): - self.flushLoggedErrors(UnknownMutableContainerVersionError, - UnknownImmutableContainerVersionError) - return res - d.addBoth(_cleanup) - return d - def render_json(self, page): d = self.render1(page, args={"t": ["json"]}) return d -class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): +class WebStatus(unittest.TestCase, WebRenderingMixin): def setUp(self): self.s = service.MultiService() self.s.startService() diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 1f46cbf9..0e53e86b 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -15,7 +15,6 @@ from nevow.util import escapeToXML from nevow import rend from allmydata import interfaces, uri, webish, dirnode -from allmydata.storage.shares import get_share_file from allmydata.storage_client import StorageFarmBroker, StubServer from allmydata.immutable import upload from allmydata.immutable.downloader.status import DownloadStatus @@ -39,6 +38,8 @@ from allmydata.test.common_web import HTTPClientGETFactory, \ HTTPClientHEADFactory from allmydata.client import Client, SecretHolder from allmydata.introducer import IntroducerNode +from allmydata.storage.expiration import ExpirationPolicy + # create a fake uploader/downloader, and a couple of fake dirnodes, then # create a webserver that works against them @@ -202,7 +203,7 @@ class FakeBucketCounter(object): "cycle-in-progress": False, "remaining-wait-time": 0} -class FakeLeaseChecker(object): +class FakeAccountingCrawler(object): def __init__(self): self.expiration_enabled = False self.mode = "age" @@ -222,9 +223,23 @@ class FakeStorageServer(service.MultiService): self.my_nodeid = nodeid self.nickname = nickname self.bucket_counter = FakeBucketCounter() - self.lease_checker = FakeLeaseChecker() + self.accounting_crawler = FakeAccountingCrawler() + self.accountant = FakeAccountant() + self.expiration_policy = ExpirationPolicy(enabled=False) def get_stats(self): return {"storage_server.accepting_immutable_shares": False} + def get_nodeid(self): + return self.my_nodeid + def get_bucket_counter(self): + return self.bucket_counter + def get_accounting_crawler(self): + return self.accounting_crawler + def get_expiration_policy(self): + return self.expiration_policy + +class FakeAccountant: + def get_all_accounts(self): + return [] class FakeClient(Client): def __init__(self): @@ -254,7 +269,9 @@ class FakeClient(Client): None, None, None) self.nodemaker.all_contents = self.all_contents self.mutable_file_default = SDMF_VERSION - self.addService(FakeStorageServer(self.nodeid, self.nickname)) + server = FakeStorageServer(self.nodeid, self.nickname) + self.accountant = server.accountant + self.addService(server) def get_long_nodeid(self): return "v0-nodeid" @@ -5323,25 +5340,24 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi d.addErrback(self.explain_web_error) return d - def _count_leases(self, ignored, which): + def _assert_leasecount(self, ignored, which, expected): u = self.uris[which] - shares = self.find_uri_shares(u) - lease_counts = [] - for shnum, serverid, fn in shares: - sf = get_share_file(fn) - num_leases = len(list(sf.get_leases())) - lease_counts.append( (fn, num_leases) ) - return lease_counts - - def _assert_leasecount(self, lease_counts, expected): - for (fn, num_leases) in lease_counts: - if num_leases != expected: - self.fail("expected %d leases, have %d, on %s" % - (expected, num_leases, fn)) + si = uri.from_string(u).get_storage_index() + num_leases = 0 + for server in self.g.servers_by_number.values(): + ss = server.get_accountant().get_anonymous_account() + ss2 = server.get_accountant().get_starter_account() + num_leases += len(ss.get_leases(si)) + len(ss2.get_leases(si)) + + if num_leases != expected: + self.fail("expected %d leases, have %d, on '%s'" % + (expected, num_leases, which)) def test_add_lease(self): self.basedir = "web/Grid/add_lease" - self.set_up_grid(num_clients=2) + N = 10 + self.set_up_grid(num_clients=2, num_servers=N) + c0 = self.g.clients[0] self.uris = {} DATA = "data" * 100 @@ -5365,12 +5381,9 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi self.fileurls[which] = "uri/" + urllib.quote(self.uris[which]) d.addCallback(_compute_fileurls) - d.addCallback(self._count_leases, "one") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "two") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "mutable") - d.addCallback(self._assert_leasecount, 1) + d.addCallback(self._assert_leasecount, "one", N) + d.addCallback(self._assert_leasecount, "two", N) + d.addCallback(self._assert_leasecount, "mutable", N) d.addCallback(self.CHECK, "one", "t=check") # no add-lease def _got_html_good(res): @@ -5378,63 +5391,43 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi self.failIfIn("Not Healthy", res) d.addCallback(_got_html_good) - d.addCallback(self._count_leases, "one") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "two") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "mutable") - d.addCallback(self._assert_leasecount, 1) + d.addCallback(self._assert_leasecount, "one", N) + d.addCallback(self._assert_leasecount, "two", N) + d.addCallback(self._assert_leasecount, "mutable", N) # this CHECK uses the original client, which uses the same # lease-secrets, so it will just renew the original lease d.addCallback(self.CHECK, "one", "t=check&add-lease=true") d.addCallback(_got_html_good) - d.addCallback(self._count_leases, "one") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "two") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "mutable") - d.addCallback(self._assert_leasecount, 1) + d.addCallback(self._assert_leasecount, "one", N) + d.addCallback(self._assert_leasecount, "two", N) + d.addCallback(self._assert_leasecount, "mutable", N) # this CHECK uses an alternate client, which adds a second lease d.addCallback(self.CHECK, "one", "t=check&add-lease=true", clientnum=1) d.addCallback(_got_html_good) - d.addCallback(self._count_leases, "one") - d.addCallback(self._assert_leasecount, 2) - d.addCallback(self._count_leases, "two") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "mutable") - d.addCallback(self._assert_leasecount, 1) + #d.addCallback(self._assert_leasecount, "one", 2*N) - d.addCallback(self.CHECK, "mutable", "t=check&add-lease=true") - d.addCallback(_got_html_good) + #d.addCallback(self.CHECK, "mutable", "t=check&add-lease=true") + #d.addCallback(_got_html_good) - d.addCallback(self._count_leases, "one") - d.addCallback(self._assert_leasecount, 2) - d.addCallback(self._count_leases, "two") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "mutable") - d.addCallback(self._assert_leasecount, 1) + #d.addCallback(self._assert_leasecount, "mutable", N) d.addCallback(self.CHECK, "mutable", "t=check&add-lease=true", clientnum=1) d.addCallback(_got_html_good) - d.addCallback(self._count_leases, "one") - d.addCallback(self._assert_leasecount, 2) - d.addCallback(self._count_leases, "two") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "mutable") - d.addCallback(self._assert_leasecount, 2) + #d.addCallback(self._assert_leasecount, "mutable", 2*N) d.addErrback(self.explain_web_error) return d def test_deep_add_lease(self): self.basedir = "web/Grid/deep_add_lease" - self.set_up_grid(num_clients=2) + N = 10 + self.set_up_grid(num_clients=2, num_servers=N) c0 = self.g.clients[0] self.uris = {} self.fileurls = {} @@ -5469,33 +5462,24 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi self.failUnlessReallyEqual(len(units), 4+1) d.addCallback(_done) - d.addCallback(self._count_leases, "root") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "one") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "mutable") - d.addCallback(self._assert_leasecount, 1) + d.addCallback(self._assert_leasecount, "root", N) + d.addCallback(self._assert_leasecount, "one", N) + d.addCallback(self._assert_leasecount, "mutable", N) d.addCallback(self.CHECK, "root", "t=stream-deep-check&add-lease=true") d.addCallback(_done) - d.addCallback(self._count_leases, "root") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "one") - d.addCallback(self._assert_leasecount, 1) - d.addCallback(self._count_leases, "mutable") - d.addCallback(self._assert_leasecount, 1) + d.addCallback(self._assert_leasecount, "root", N) + d.addCallback(self._assert_leasecount, "one", N) + d.addCallback(self._assert_leasecount, "mutable", N) d.addCallback(self.CHECK, "root", "t=stream-deep-check&add-lease=true", clientnum=1) d.addCallback(_done) - d.addCallback(self._count_leases, "root") - d.addCallback(self._assert_leasecount, 2) - d.addCallback(self._count_leases, "one") - d.addCallback(self._assert_leasecount, 2) - d.addCallback(self._count_leases, "mutable") - d.addCallback(self._assert_leasecount, 2) + #d.addCallback(self._assert_leasecount, "root", 2*N) + #d.addCallback(self._assert_leasecount, "one", 2*N) + #d.addCallback(self._assert_leasecount, "mutable", 2*N) d.addErrback(self.explain_web_error) return d diff --git a/src/allmydata/web/storage.py b/src/allmydata/web/storage.py index cb3b2804..656090f3 100644 --- a/src/allmydata/web/storage.py +++ b/src/allmydata/web/storage.py @@ -3,7 +3,7 @@ import time, simplejson from nevow import rend, tags as T, inevow from allmydata.web.common import getxmlfile, abbreviate_time, get_arg from allmydata.util.abbreviate import abbreviate_space -from allmydata.util import time_format, idlib +from allmydata.util import idlib def remove_prefix(s, prefix): if not s.startswith(prefix): @@ -28,17 +28,19 @@ class StorageStatus(rend.Page): def render_JSON(self, req): req.setHeader("content-type", "text/plain") + accounting_crawler = self.storage.get_accounting_crawler() + bucket_counter = self.storage.get_bucket_counter() d = {"stats": self.storage.get_stats(), - "bucket-counter": self.storage.bucket_counter.get_state(), - "lease-checker": self.storage.lease_checker.get_state(), - "lease-checker-progress": self.storage.lease_checker.get_progress(), + "bucket-counter": bucket_counter.get_state(), + "lease-checker": accounting_crawler.get_state(), + "lease-checker-progress": accounting_crawler.get_progress(), } return simplejson.dumps(d, indent=1) + "\n" def data_nickname(self, ctx, storage): return self.nickname def data_nodeid(self, ctx, storage): - return idlib.nodeid_b2a(self.storage.my_nodeid) + return idlib.nodeid_b2a(self.storage.get_nodeid()) def render_storage_running(self, ctx, storage): if storage: @@ -93,14 +95,14 @@ class StorageStatus(rend.Page): return d def data_last_complete_bucket_count(self, ctx, data): - s = self.storage.bucket_counter.get_state() + s = self.storage.get_bucket_counter().get_state() count = s.get("last-complete-bucket-count") if count is None: return "Not computed yet" return count def render_count_crawler_status(self, ctx, storage): - p = self.storage.bucket_counter.get_progress() + p = self.storage.get_bucket_counter().get_progress() return ctx.tag[self.format_crawler_progress(p)] def format_crawler_progress(self, p): @@ -129,28 +131,12 @@ class StorageStatus(rend.Page): cycletime_s] def render_lease_expiration_enabled(self, ctx, data): - lc = self.storage.lease_checker - if lc.expiration_enabled: - return ctx.tag["Enabled: expired leases will be removed"] - else: - return ctx.tag["Disabled: scan-only mode, no leases will be removed"] + ep = self.storage.get_expiration_policy() + return ctx.tag[ep.describe_enabled()] def render_lease_expiration_mode(self, ctx, data): - lc = self.storage.lease_checker - if lc.mode == "age": - if lc.override_lease_duration is None: - ctx.tag["Leases will expire naturally, probably 31 days after " - "creation or renewal."] - else: - ctx.tag["Leases created or last renewed more than %s ago " - "will be considered expired." - % abbreviate_time(lc.override_lease_duration)] - else: - assert lc.mode == "cutoff-date" - localizedutcdate = time.strftime("%d-%b-%Y", time.gmtime(lc.cutoff_date)) - isoutcdate = time_format.iso_utc_date(lc.cutoff_date) - ctx.tag["Leases created or last renewed before %s (%s) UTC " - "will be considered expired." % (isoutcdate, localizedutcdate, )] + ep = self.storage.get_expiration_policy() + ctx.tag[ep.describe_expiration()] return ctx.tag def format_recovered(self, sr, a): @@ -169,16 +155,16 @@ class StorageStatus(rend.Page): ) def render_lease_current_cycle_progress(self, ctx, data): - lc = self.storage.lease_checker - p = lc.get_progress() + ac = self.storage.get_accounting_crawler() + p = ac.get_progress() return ctx.tag[self.format_crawler_progress(p)] def render_lease_current_cycle_results(self, ctx, data): - lc = self.storage.lease_checker - p = lc.get_progress() + ac = self.storage.get_accounting_crawler() + p = ac.get_progress() if not p["cycle-in-progress"]: return "" - s = lc.get_state() + s = ac.get_state() so_far = s["cycle-to-date"] sr = so_far["space-recovered"] er = s["estimated-remaining-cycle"] @@ -220,8 +206,8 @@ class StorageStatus(rend.Page): return ctx.tag["Current cycle:", p] def render_lease_last_cycle_results(self, ctx, data): - lc = self.storage.lease_checker - h = lc.get_state()["history"] + ac = self.storage.get_accounting_crawler() + h = ac.get_state()["history"] if not h: return "" last = h[max(h.keys())]