from twisted.trial import unittest
-from twisted.internet import defer
+from twisted.internet import defer, reactor
from twisted.application import service
from foolscap.api import fireEventually
import itertools
+
from allmydata import interfaces
-from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
+from allmydata.util import fileutil, hashutil, base32, time_format
from allmydata.storage.server import StorageServer
from allmydata.storage.mutable import MutableShareFile
-from allmydata.storage.immutable import BucketWriter, BucketReader
+from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
-from allmydata.storage.lease import LeaseInfo
-from allmydata.storage.expirer import LeaseCheckingCrawler
+from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE, SHARETYPE_MUTABLE
+from allmydata.storage.expiration import ExpirationPolicy
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
ReadBucketProxy
from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
class Marker:
pass
+
+class FakeAccount:
+ def add_share(self, storage_index, shnum, used_space, sharetype, commit=True):
+ pass
+ def add_or_renew_default_lease(self, storage_index, shnum, commit=True):
+ pass
+ def mark_share_as_stable(self, storage_index, shnum, used_space, commit=True):
+ pass
+
class FakeCanary:
def __init__(self, ignore_disconnectors=False):
self.ignore = ignore_disconnectors
def register_producer(self, producer):
pass
-class Bucket(unittest.TestCase):
- def make_workdir(self, name):
- basedir = os.path.join("storage", "Bucket", name)
- incoming = os.path.join(basedir, "tmp", "bucket")
- final = os.path.join(basedir, "bucket")
- fileutil.make_dirs(basedir)
- fileutil.make_dirs(os.path.join(basedir, "tmp"))
- return incoming, final
+class BucketTestMixin:
def bucket_writer_closed(self, bw, consumed):
pass
def add_latency(self, category, latency):
def count(self, name, delta=1):
pass
- def make_lease(self):
- owner_num = 0
- renew_secret = os.urandom(32)
- cancel_secret = os.urandom(32)
- expiration_time = time.time() + 5000
- return LeaseInfo(owner_num, renew_secret, cancel_secret,
- expiration_time, "\x00" * 20)
+
+class Bucket(BucketTestMixin, unittest.TestCase):
+ def make_workdir(self, name):
+ basedir = os.path.join("storage", "Bucket", name)
+ incoming = os.path.join(basedir, "tmp", "bucket")
+ final = os.path.join(basedir, "bucket")
+ fileutil.make_dirs(basedir)
+ fileutil.make_dirs(os.path.join(basedir, "tmp"))
+ return incoming, final
def test_create(self):
incoming, final = self.make_workdir("test_create")
- bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
- FakeCanary())
- bw.remote_write(0, "a"*25)
+ bw = BucketWriter(self, FakeAccount(), "si1", 0, incoming, final, 200, FakeCanary())
+ bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*25)
bw.remote_write(75, "d"*7)
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
- bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
- FakeCanary())
- bw.remote_write(0, "a"*25)
+ bw = BucketWriter(self, FakeAccount(), "si1", 0, incoming, final, 200, FakeCanary())
+ bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*7) # last block may be short
bw.remote_close()
# now read from it
br = BucketReader(self, bw.finalhome)
- self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
+ self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
- self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
+ self.failUnlessEqual(br.remote_read(50, 7 ), "c"*7 )
def test_read_past_end_of_share_data(self):
# test vector for immutable files (hard-coded contents of an immutable share
# file):
- # The following immutable share file content is identical to that
- # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
- # with share data == 'a'. The total size of this content is 85
- # bytes.
-
containerdata = struct.pack('>LLL', 1, 1, 1)
# A Tahoe-LAFS storage client would send as the share_data a
# -- see allmydata/immutable/layout.py . This test, which is
# simulating a client, just sends 'a'.
share_data = 'a'
-
- ownernumber = struct.pack('>L', 0)
- renewsecret = 'THIS LETS ME RENEW YOUR FILE....'
- assert len(renewsecret) == 32
- cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
- assert len(cancelsecret) == 32
- expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
-
- lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
-
- share_file_data = containerdata + share_data + lease_data
+ extra_data = 'b' * ShareFile.LEASE_SIZE
+ share_file_data = containerdata + share_data + extra_data
incoming, final = self.make_workdir("test_read_past_end_of_share_data")
self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
- # Read past the end of share data to get the cancel secret.
- read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
-
- result_of_read = br.remote_read(0, read_length)
- self.failUnlessEqual(result_of_read, share_data)
-
+ # Read past the end of share data by 1 byte.
result_of_read = br.remote_read(0, len(share_data)+1)
self.failUnlessEqual(result_of_read, share_data)
return defer.maybeDeferred(_call)
-class BucketProxy(unittest.TestCase):
+class BucketProxy(BucketTestMixin, unittest.TestCase):
def make_bucket(self, name, size):
basedir = os.path.join("storage", "BucketProxy", name)
incoming = os.path.join(basedir, "tmp", "bucket")
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
- bw = BucketWriter(self, incoming, final, size, self.make_lease(),
- FakeCanary())
+ si = "si1"
+ bw = BucketWriter(self, FakeAccount(), si, 0, incoming, final, size, FakeCanary())
rb = RemoteBucket()
rb.target = bw
return bw, rb, final
- def make_lease(self):
- owner_num = 0
- renew_secret = os.urandom(32)
- cancel_secret = os.urandom(32)
- expiration_time = time.time() + 5000
- return LeaseInfo(owner_num, renew_secret, cancel_secret,
- expiration_time, "\x00" * 20)
-
- def bucket_writer_closed(self, bw, consumed):
- pass
- def add_latency(self, category, latency):
- pass
- def count(self, name, delta=1):
- pass
-
def test_create(self):
bw, rb, sharefname = self.make_bucket("test_create", 500)
bp = WriteBucketProxy(rb, None,
self.create("test_create")
def test_declares_fixed_1528(self):
- ss = self.create("test_declares_fixed_1528")
- ver = ss.remote_get_version()
+ server = self.create("test_declares_fixed_1528")
+ aa = server.get_accountant().get_anonymous_account()
+
+ ver = aa.remote_get_version()
sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
def test_declares_maximum_share_sizes(self):
- ss = self.create("test_declares_maximum_share_sizes")
- ver = ss.remote_get_version()
+ server = self.create("test_declares_maximum_share_sizes")
+ aa = server.get_accountant().get_anonymous_account()
+
+ ver = aa.remote_get_version()
sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnlessIn('maximum-immutable-share-size', sv1)
self.failUnlessIn('maximum-mutable-share-size', sv1)
- def allocate(self, ss, storage_index, sharenums, size, canary=None):
+ def allocate(self, aa, storage_index, sharenums, size, canary=None):
renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
if not canary:
canary = FakeCanary()
- return ss.remote_allocate_buckets(storage_index,
+ return aa.remote_allocate_buckets(storage_index,
renew_secret, cancel_secret,
sharenums, size, canary)
if avail <= 4*2**30:
raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
- ss = self.create("test_large_share")
+ server = self.create("test_large_share")
+ aa = server.get_accountant().get_anonymous_account()
- already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
+ already,writers = self.allocate(aa, "allocate", [0], 2**32+2)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0]))
bucket.remote_write(2**32, "ab")
bucket.remote_close()
- readers = ss.remote_get_buckets("allocate")
+ readers = aa.remote_get_buckets("allocate")
reader = readers[shnum]
self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
share lots of leading bits with an extant share (but isn't the exact
same storage index), this won't add an entry to the share directory.
"""
- ss = self.create("test_dont_overfill_dirs")
- already, writers = self.allocate(ss, "storageindex", [0], 10)
+ server = self.create("test_dont_overfill_dirs")
+ aa = server.get_accountant().get_anonymous_account()
+
+ already, writers = self.allocate(aa, "storageindex", [0], 10)
for i, wb in writers.items():
wb.remote_write(0, "%10d" % i)
wb.remote_close()
# Now store another one under another storageindex that has leading
# chars the same as the first storageindex.
- already, writers = self.allocate(ss, "storageindey", [0], 10)
+ already, writers = self.allocate(aa, "storageindey", [0], 10)
for i, wb in writers.items():
wb.remote_write(0, "%10d" % i)
wb.remote_close()
self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
def test_remove_incoming(self):
- ss = self.create("test_remove_incoming")
- already, writers = self.allocate(ss, "vid", range(3), 10)
+ server = self.create("test_remove_incoming")
+ aa = server.get_accountant().get_anonymous_account()
+
+ already, writers = self.allocate(aa, "vid", range(3), 10)
for i,wb in writers.items():
wb.remote_write(0, "%10d" % i)
wb.remote_close()
# remote_abort, when called on a writer, should make sure that
# the allocated size of the bucket is not counted by the storage
# server when accounting for space.
- ss = self.create("test_abort")
- already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
- self.failIfEqual(ss.allocated_size(), 0)
+ server = self.create("test_abort")
+ aa = server.get_accountant().get_anonymous_account()
+
+ already, writers = self.allocate(aa, "allocate", [0, 1, 2], 150)
+ self.failIfEqual(server.allocated_size(), 0)
# Now abort the writers.
for writer in writers.itervalues():
writer.remote_abort()
- self.failUnlessEqual(ss.allocated_size(), 0)
-
+ self.failUnlessEqual(server.allocated_size(), 0)
def test_allocate(self):
- ss = self.create("test_allocate")
+ server = self.create("test_allocate")
+ aa = server.get_accountant().get_anonymous_account()
- self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
+ self.failUnlessEqual(aa.remote_get_buckets("allocate"), {})
- already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
+ already,writers = self.allocate(aa, "allocate", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
# while the buckets are open, they should not count as readable
- self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
+ self.failUnlessEqual(aa.remote_get_buckets("allocate"), {})
# close the buckets
for i,wb in writers.items():
wb.remote_abort()
# now they should be readable
- b = ss.remote_get_buckets("allocate")
+ b = aa.remote_get_buckets("allocate")
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
b_str = str(b[0])
# now if we ask about writing again, the server should offer those
# three buckets as already present. It should offer them even if we
# don't ask about those specific ones.
- already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
+ already,writers = self.allocate(aa, "allocate", [2,3,4], 75)
self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4]))
# while those two buckets are open for writing, the server should
# refuse to offer them to uploaders
- already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
+ already2,writers2 = self.allocate(aa, "allocate", [2,3,4,5], 75)
self.failUnlessEqual(already2, set([0,1,2]))
self.failUnlessEqual(set(writers2.keys()), set([5]))
# aborting the writes should remove the tempfiles
for i,wb in writers2.items():
wb.remote_abort()
- already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
+ already2,writers2 = self.allocate(aa, "allocate", [2,3,4,5], 75)
self.failUnlessEqual(already2, set([0,1,2]))
self.failUnlessEqual(set(writers2.keys()), set([5]))
wb.remote_abort()
def test_bad_container_version(self):
- ss = self.create("test_bad_container_version")
- a,w = self.allocate(ss, "si1", [0], 10)
+ server = self.create("test_bad_container_version")
+ aa = server.get_accountant().get_anonymous_account()
+
+ a,w = self.allocate(aa, "si1", [0], 10)
w[0].remote_write(0, "\xff"*10)
w[0].remote_close()
- fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
+ fn = os.path.join(server.sharedir, storage_index_to_dir("si1"), "0")
f = open(fn, "rb+")
f.seek(0)
f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
f.close()
- ss.remote_get_buckets("allocate")
+ aa.remote_get_buckets("allocate")
e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
- ss.remote_get_buckets, "si1")
+ aa.remote_get_buckets, "si1")
self.failUnlessIn(" had version 0 but we wanted 1", str(e))
def test_disconnect(self):
# simulate a disconnection
- ss = self.create("test_disconnect")
+ server = self.create("test_disconnect")
+ aa = server.get_accountant().get_anonymous_account()
+
canary = FakeCanary()
- already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
+ already,writers = self.allocate(aa, "disconnect", [0,1,2], 75, canary)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
for (f,args,kwargs) in canary.disconnectors.values():
del writers
# that ought to delete the incoming shares
- already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
+ already,writers = self.allocate(aa, "disconnect", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
'avail': max(15000 - reserved_space, 0),
}
- ss = self.create("test_reserved_space", reserved_space=reserved_space)
+ server = self.create("test_reserved_space", reserved_space=reserved_space)
+ aa = server.get_accountant().get_anonymous_account()
+
# 15k available, 10k reserved, leaves 5k for shares
# a newly created and filled share incurs this much overhead, beyond
OVERHEAD = 3*4
LEASE_SIZE = 4+32+32+4
canary = FakeCanary(True)
- already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
+ already,writers = self.allocate(aa, "vid1", [0,1,2], 1000, canary)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed
- self.failUnlessEqual(len(ss._active_writers), 3)
+ self.failUnlessEqual(len(server._active_writers), 3)
# allocating 1001-byte shares only leaves room for one
- already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
+ already2,writers2 = self.allocate(aa, "vid2", [0,1,2], 1001, canary)
self.failUnlessEqual(len(writers2), 1)
- self.failUnlessEqual(len(ss._active_writers), 4)
+ self.failUnlessEqual(len(server._active_writers), 4)
# we abandon the first set, so their provisional allocation should be
# returned
del already
del writers
- self.failUnlessEqual(len(ss._active_writers), 1)
+ self.failUnlessEqual(len(server._active_writers), 1)
# now we have a provisional allocation of 1001 bytes
# and we close the second set, so their provisional allocation should
del already2
del writers2
del bw
- self.failUnlessEqual(len(ss._active_writers), 0)
+ self.failUnlessEqual(len(server._active_writers), 0)
allocated = 1001 + OVERHEAD + LEASE_SIZE
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
- already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
+ already3,writers3 = self.allocate(aa, "vid3", range(100), 100, canary)
self.failUnlessEqual(len(writers3), 39)
- self.failUnlessEqual(len(ss._active_writers), 39)
+ self.failUnlessEqual(len(server._active_writers), 39)
del already3
del writers3
- self.failUnlessEqual(len(ss._active_writers), 0)
- ss.disownServiceParent()
- del ss
+ self.failUnlessEqual(len(server._active_writers), 0)
+ server.disownServiceParent()
+ del server
def test_seek(self):
basedir = self.workdir("test_seek_behavior")
fileutil.make_dirs(basedir)
filename = os.path.join(basedir, "testfile")
- f = open(filename, "wb")
- f.write("start")
- f.close()
+ fileutil.write(filename, "start")
+
# mode="w" allows seeking-to-create-holes, but truncates pre-existing
# files. mode="a" preserves previous contents but does not allow
# seeking-to-create-holes. mode="r+" allows both.
f2 = open(filename, "rb")
self.failUnlessEqual(f2.read(5), "start")
+ def compare_leases(self, leases_a, leases_b, with_timestamps=True):
+ self.failUnlessEqual(len(leases_a), len(leases_b))
+ for i in range(len(leases_a)):
+ a = leases_a[i]
+ b = leases_b[i]
+ self.failUnlessEqual(a.owner_num, b.owner_num)
+ if with_timestamps:
+ self.failUnlessEqual(a.renewal_time, b.renewal_time)
+ self.failUnlessEqual(a.expiration_time, b.expiration_time)
def test_leases(self):
- ss = self.create("test_leases")
+ server = self.create("test_leases")
+ aa = server.get_accountant().get_anonymous_account()
+ sa = server.get_accountant().get_starter_account()
+
canary = FakeCanary()
sharenums = range(5)
size = 100
- rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
- hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
- already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
+ # create a random non-numeric file in the bucket directory, to
+ # exercise the code that's supposed to ignore those.
+ bucket_dir = os.path.join(self.workdir("test_leases"),
+ "shares", storage_index_to_dir("six"))
+ os.makedirs(bucket_dir)
+ fileutil.write(os.path.join(bucket_dir, "ignore_me.txt"),
+ "you ought to be ignoring me\n")
+
+ already,writers = aa.remote_allocate_buckets("si1", "", "",
sharenums, size, canary)
self.failUnlessEqual(len(already), 0)
self.failUnlessEqual(len(writers), 5)
for wb in writers.values():
wb.remote_close()
- leases = list(ss.get_leases("si0"))
- self.failUnlessEqual(len(leases), 1)
- self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
+ leases = aa.get_leases("si1")
+ self.failUnlessEqual(len(leases), 5)
- rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
- hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
- already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
- sharenums, size, canary)
- for wb in writers.values():
- wb.remote_close()
+ aa.add_share("six", 0, 0, SHARETYPE_IMMUTABLE)
+ # adding a share does not immediately add a lease
+ self.failUnlessEqual(len(aa.get_leases("six")), 0)
- # take out a second lease on si1
- rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
- hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
- already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
- sharenums, size, canary)
- self.failUnlessEqual(len(already), 5)
- self.failUnlessEqual(len(writers), 0)
+ aa.add_or_renew_default_lease("six", 0)
+ self.failUnlessEqual(len(aa.get_leases("six")), 1)
- leases = list(ss.get_leases("si1"))
- self.failUnlessEqual(len(leases), 2)
- self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
+ # add-lease on a missing storage index is silently ignored
+ self.failUnlessEqual(aa.remote_add_lease("si18", "", ""), None)
+ self.failUnlessEqual(len(aa.get_leases("si18")), 0)
- # and a third lease, using add-lease
- rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
- hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
- ss.remote_add_lease("si1", rs2a, cs2a)
- leases = list(ss.get_leases("si1"))
- self.failUnlessEqual(len(leases), 3)
- self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
+ all_leases = aa.get_leases("si1")
- # add-lease on a missing storage index is silently ignored
- self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
-
- # check that si0 is readable
- readers = ss.remote_get_buckets("si0")
- self.failUnlessEqual(len(readers), 5)
-
- # renew the first lease. Only the proper renew_secret should work
- ss.remote_renew_lease("si0", rs0)
- self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
- self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
-
- # check that si0 is still readable
- readers = ss.remote_get_buckets("si0")
- self.failUnlessEqual(len(readers), 5)
-
- # There is no such method as remote_cancel_lease for now -- see
- # ticket #1528.
- self.failIf(hasattr(ss, 'remote_cancel_lease'), \
- "ss should not have a 'remote_cancel_lease' method/attribute")
-
- # test overlapping uploads
- rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
- hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
- rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
- hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
- already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
- sharenums, size, canary)
- self.failUnlessEqual(len(already), 0)
- self.failUnlessEqual(len(writers), 5)
- already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
- sharenums, size, canary)
- self.failUnlessEqual(len(already2), 0)
- self.failUnlessEqual(len(writers2), 0)
- for wb in writers.values():
- wb.remote_close()
+ # renew the lease directly
+ aa.remote_renew_lease("si1", "")
+ self.failUnlessEqual(len(aa.get_leases("si1")), 5)
+ self.compare_leases(all_leases, aa.get_leases("si1"), with_timestamps=False)
- leases = list(ss.get_leases("si3"))
- self.failUnlessEqual(len(leases), 1)
+ # Now allocate more leases using a different account.
+ # A new lease should be allocated for every share in the shareset.
+ sa.remote_renew_lease("si1", "")
+ self.failUnlessEqual(len(aa.get_leases("si1")), 5)
+ self.failUnlessEqual(len(sa.get_leases("si1")), 5)
- already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
- sharenums, size, canary)
- self.failUnlessEqual(len(already3), 5)
- self.failUnlessEqual(len(writers3), 0)
+ all_leases2 = sa.get_leases("si1")
- leases = list(ss.get_leases("si3"))
- self.failUnlessEqual(len(leases), 2)
+ sa.remote_renew_lease("si1", "")
+ self.compare_leases(all_leases2, sa.get_leases("si1"), with_timestamps=False)
def test_readonly(self):
workdir = self.workdir("test_readonly")
- ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
- ss.setServiceParent(self.sparent)
+ server = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
+ server.setServiceParent(self.sparent)
+ aa = server.get_accountant().get_anonymous_account()
- already,writers = self.allocate(ss, "vid", [0,1,2], 75)
+ already,writers = self.allocate(aa, "vid", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(writers, {})
- stats = ss.get_stats()
+ stats = server.get_stats()
self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
if "storage_server.disk_avail" in stats:
# Some platforms may not have an API to get disk stats.
def test_advise_corruption(self):
workdir = self.workdir("test_advise_corruption")
- ss = StorageServer(workdir, "\x00" * 20)
- ss.setServiceParent(self.sparent)
+ server = StorageServer(workdir, "\x00" * 20)
+ server.setServiceParent(self.sparent)
+ aa = server.get_accountant().get_anonymous_account()
si0_s = base32.b2a("si0")
- ss.remote_advise_corrupt_share("immutable", "si0", 0,
+ aa.remote_advise_corrupt_share("immutable", "si0", 0,
"This share smells funny.\n")
reportdir = os.path.join(workdir, "corruption-advisories")
reports = os.listdir(reportdir)
# test the RIBucketWriter version too
si1_s = base32.b2a("si1")
- already,writers = self.allocate(ss, "si1", [1], 75)
+ already,writers = self.allocate(aa, "si1", [1], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([1]))
writers[1].remote_write(0, "data")
writers[1].remote_close()
- b = ss.remote_get_buckets("si1")
+ b = aa.remote_get_buckets("si1")
self.failUnlessEqual(set(b.keys()), set([1]))
b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
def cancel_secret(self, tag):
return hashutil.tagged_hash("cancel_blah", str(tag))
- def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
+ def allocate(self, aa, storage_index, we_tag, lease_tag, sharenums, size):
write_enabler = self.write_enabler(we_tag)
renew_secret = self.renew_secret(lease_tag)
cancel_secret = self.cancel_secret(lease_tag)
- rstaraw = ss.remote_slot_testv_and_readv_and_writev
+ rstaraw = aa.remote_slot_testv_and_readv_and_writev
testandwritev = dict( [ (shnum, ([], [], None) )
for shnum in sharenums ] )
readv = []
def test_bad_magic(self):
- ss = self.create("test_bad_magic")
- self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
- fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
+ server = self.create("test_bad_magic")
+ aa = server.get_accountant().get_anonymous_account()
+
+ self.allocate(aa, "si1", "we1", self._lease_secret.next(), set([0]), 10)
+ fn = os.path.join(server.sharedir, storage_index_to_dir("si1"), "0")
f = open(fn, "rb+")
f.seek(0)
f.write("BAD MAGIC")
f.close()
- read = ss.remote_slot_readv
+ read = aa.remote_slot_readv
e = self.failUnlessRaises(UnknownMutableContainerVersionError,
read, "si1", [0], [(0,10)])
self.failUnlessIn(" had magic ", str(e))
self.failUnlessIn(" but we wanted ", str(e))
def test_container_size(self):
- ss = self.create("test_container_size")
- self.allocate(ss, "si1", "we1", self._lease_secret.next(),
+ server = self.create("test_container_size")
+ aa = server.get_accountant().get_anonymous_account()
+
+ self.allocate(aa, "si1", "we1", self._lease_secret.next(),
set([0,1,2]), 100)
- read = ss.remote_slot_readv
- rstaraw = ss.remote_slot_testv_and_readv_and_writev
+ read = aa.remote_slot_readv
+ rstaraw = aa.remote_slot_testv_and_readv_and_writev
secrets = ( self.write_enabler("we1"),
self.renew_secret("we1"),
self.cancel_secret("we1") )
# Also see if the server explicitly declares that it supports this
# feature.
- ver = ss.remote_get_version()
+ ver = aa.remote_get_version()
storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
self.failUnlessEqual(read_answer, {})
def test_allocate(self):
- ss = self.create("test_allocate")
- self.allocate(ss, "si1", "we1", self._lease_secret.next(),
+ server = self.create("test_allocate")
+ aa = server.get_accountant().get_anonymous_account()
+
+ self.allocate(aa, "si1", "we1", self._lease_secret.next(),
set([0,1,2]), 100)
- read = ss.remote_slot_readv
+ read = aa.remote_slot_readv
self.failUnlessEqual(read("si1", [0], [(0, 10)]),
{0: [""]})
self.failUnlessEqual(read("si1", [], [(0, 10)]),
self.renew_secret("we1"),
self.cancel_secret("we1") )
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
- write = ss.remote_slot_testv_and_readv_and_writev
+ write = aa.remote_slot_testv_and_readv_and_writev
answer = write("si1", secrets,
{0: ([], [(0,data)], None)},
[])
def test_operators(self):
# test operators, the data we're comparing is '11111' in all cases.
# test both fail+pass, reset data after each one.
- ss = self.create("test_operators")
+ server = self.create("test_operators")
+ aa = server.get_accountant().get_anonymous_account()
secrets = ( self.write_enabler("we1"),
self.renew_secret("we1"),
self.cancel_secret("we1") )
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
- write = ss.remote_slot_testv_and_readv_and_writev
- read = ss.remote_slot_readv
+ write = aa.remote_slot_testv_and_readv_and_writev
+ read = aa.remote_slot_readv
def reset():
write("si1", secrets,
reset()
def test_readv(self):
- ss = self.create("test_readv")
+ server = self.create("test_readv")
+ aa = server.get_accountant().get_anonymous_account()
+
secrets = ( self.write_enabler("we1"),
self.renew_secret("we1"),
self.cancel_secret("we1") )
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
- write = ss.remote_slot_testv_and_readv_and_writev
- read = ss.remote_slot_readv
+ write = aa.remote_slot_testv_and_readv_and_writev
+ read = aa.remote_slot_readv
data = [("%d" % i) * 100 for i in range(3)]
rc = write("si1", secrets,
{0: ([], [(0,data[0])], None),
1: ["1"*10],
2: ["2"*10]})
- def compare_leases_without_timestamps(self, leases_a, leases_b):
- self.failUnlessEqual(len(leases_a), len(leases_b))
- for i in range(len(leases_a)):
- a = leases_a[i]
- b = leases_b[i]
- self.failUnlessEqual(a.owner_num, b.owner_num)
- self.failUnlessEqual(a.renew_secret, b.renew_secret)
- self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
- self.failUnlessEqual(a.nodeid, b.nodeid)
-
- def compare_leases(self, leases_a, leases_b):
+ def compare_leases(self, leases_a, leases_b, with_timestamps=True):
self.failUnlessEqual(len(leases_a), len(leases_b))
for i in range(len(leases_a)):
a = leases_a[i]
b = leases_b[i]
- self.failUnlessEqual(a.owner_num, b.owner_num)
- self.failUnlessEqual(a.renew_secret, b.renew_secret)
- self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
- self.failUnlessEqual(a.nodeid, b.nodeid)
- self.failUnlessEqual(a.expiration_time, b.expiration_time)
+ self.failUnlessEqual(a.owner_num, b.owner_num)
+ if with_timestamps:
+ self.failUnlessEqual(a.renewal_time, b.renewal_time)
+ self.failUnlessEqual(a.expiration_time, b.expiration_time)
def test_leases(self):
- ss = self.create("test_leases")
+ server = self.create("test_leases")
+ aa = server.get_accountant().get_anonymous_account()
+ sa = server.get_accountant().get_starter_account()
+
def secrets(n):
return ( self.write_enabler("we1"),
self.renew_secret("we1-%d" % n),
self.cancel_secret("we1-%d" % n) )
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
- write = ss.remote_slot_testv_and_readv_and_writev
- read = ss.remote_slot_readv
- rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
+ write = aa.remote_slot_testv_and_readv_and_writev
+ write2 = sa.remote_slot_testv_and_readv_and_writev
+ read = aa.remote_slot_readv
+ rc = write("si0", secrets(0), {0: ([], [(0,data)], None)}, [])
self.failUnlessEqual(rc, (True, {}))
# create a random non-numeric file in the bucket directory, to
# exercise the code that's supposed to ignore those.
bucket_dir = os.path.join(self.workdir("test_leases"),
- "shares", storage_index_to_dir("si1"))
- f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
- f.write("you ought to be ignoring me\n")
- f.close()
+ "shares", storage_index_to_dir("six"))
+ os.makedirs(bucket_dir)
+ fileutil.write(os.path.join(bucket_dir, "ignore_me.txt"),
+ "you ought to be ignoring me\n")
s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
- self.failUnlessEqual(len(list(s0.get_leases())), 1)
+ s0.create("nodeid", secrets(0)[0])
+
+ aa.add_share("six", 0, 0, SHARETYPE_MUTABLE)
+ # adding a share does not immediately add a lease
+ self.failUnlessEqual(len(aa.get_leases("six")), 0)
+
+ aa.add_or_renew_default_lease("six", 0)
+ self.failUnlessEqual(len(aa.get_leases("six")), 1)
# add-lease on a missing storage index is silently ignored
- self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
+ self.failUnlessEqual(aa.remote_add_lease("si18", "", ""), None)
+ self.failUnlessEqual(len(aa.get_leases("si18")), 0)
- # re-allocate the slots and use the same secrets, that should update
- # the lease
+ # update the lease by writing
write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
- self.failUnlessEqual(len(list(s0.get_leases())), 1)
+ self.failUnlessEqual(len(aa.get_leases("si1")), 1)
# renew it directly
- ss.remote_renew_lease("si1", secrets(0)[1])
- self.failUnlessEqual(len(list(s0.get_leases())), 1)
-
- # now allocate them with a bunch of different secrets, to trigger the
- # extended lease code. Use add_lease for one of them.
- write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
- self.failUnlessEqual(len(list(s0.get_leases())), 2)
- secrets2 = secrets(2)
- ss.remote_add_lease("si1", secrets2[1], secrets2[2])
- self.failUnlessEqual(len(list(s0.get_leases())), 3)
- write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
- write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
- write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
-
- self.failUnlessEqual(len(list(s0.get_leases())), 6)
-
- all_leases = list(s0.get_leases())
- # and write enough data to expand the container, forcing the server
- # to move the leases
- write("si1", secrets(0),
- {0: ([], [(0,data)], 200), },
- [])
-
- # read back the leases, make sure they're still intact.
- self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
-
- ss.remote_renew_lease("si1", secrets(0)[1])
- ss.remote_renew_lease("si1", secrets(1)[1])
- ss.remote_renew_lease("si1", secrets(2)[1])
- ss.remote_renew_lease("si1", secrets(3)[1])
- ss.remote_renew_lease("si1", secrets(4)[1])
- self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
+ aa.remote_renew_lease("si1", secrets(0)[1])
+ self.failUnlessEqual(len(aa.get_leases("si1")), 1)
+
+ # now allocate another lease using a different account
+ write2("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
+ self.failUnlessEqual(len(aa.get_leases("si1")), 1)
+ self.failUnlessEqual(len(sa.get_leases("si1")), 1)
+
+ aa_leases = aa.get_leases("si1")
+ sa_leases = sa.get_leases("si1")
+
+ aa.remote_renew_lease("si1", secrets(0)[1])
+ self.compare_leases(aa_leases, aa.get_leases("si1"), with_timestamps=False)
+
+ sa.remote_renew_lease("si1", secrets(1)[1])
+ self.compare_leases(sa_leases, sa.get_leases("si1"), with_timestamps=False)
+
# get a new copy of the leases, with the current timestamps. Reading
- # data and failing to renew/cancel leases should leave the timestamps
- # alone.
- all_leases = list(s0.get_leases())
- # renewing with a bogus token should prompt an error message
-
- # examine the exception thus raised, make sure the old nodeid is
- # present, to provide for share migration
- e = self.failUnlessRaises(IndexError,
- ss.remote_renew_lease, "si1",
- secrets(20)[1])
- e_s = str(e)
- self.failUnlessIn("Unable to renew non-existent lease", e_s)
- self.failUnlessIn("I have leases accepted by nodeids:", e_s)
- self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
-
- self.compare_leases(all_leases, list(s0.get_leases()))
+ # data should leave the timestamps alone.
+ aa_leases = aa.get_leases("si1")
# reading shares should not modify the timestamp
read("si1", [], [(0,200)])
- self.compare_leases(all_leases, list(s0.get_leases()))
+ self.compare_leases(aa_leases, aa.get_leases("si1"))
write("si1", secrets(0),
{0: ([], [(200, "make me bigger")], None)}, [])
- self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
+ self.compare_leases(aa_leases, aa.get_leases("si1"), with_timestamps=False)
write("si1", secrets(0),
{0: ([], [(500, "make me really bigger")], None)}, [])
- self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
+ self.compare_leases(aa_leases, aa.get_leases("si1"), with_timestamps=False)
def test_remove(self):
- ss = self.create("test_remove")
- self.allocate(ss, "si1", "we1", self._lease_secret.next(),
+ server = self.create("test_remove")
+ aa = server.get_accountant().get_anonymous_account()
+
+ self.allocate(aa, "si1", "we1", self._lease_secret.next(),
set([0,1,2]), 100)
- readv = ss.remote_slot_readv
- writev = ss.remote_slot_testv_and_readv_and_writev
+ readv = aa.remote_slot_readv
+ writev = aa.remote_slot_testv_and_readv_and_writev
secrets = ( self.write_enabler("we1"),
self.renew_secret("we1"),
self.cancel_secret("we1") )
def setUp(self):
self.sparent = LoggingServiceParent()
self._lease_secret = itertools.count()
- self.ss = self.create("MDMFProxies storage test server")
+ self.aa = self.create("MDMFProxies storage test server")
self.rref = RemoteBucket()
- self.rref.target = self.ss
+ self.rref.target = self.aa
self.secrets = (self.write_enabler("we_secret"),
self.renew_secret("renew_secret"),
self.cancel_secret("cancel_secret"))
workdir = self.workdir(name)
server = StorageServer(workdir, "\x00" * 20)
server.setServiceParent(self.sparent)
- return server
+ return server.get_accountant().get_anonymous_account()
def build_test_mdmf_share(self, tail_segment=False, empty=False):
# Start with the checkstring
tail_segment=False,
empty=False):
"""
- I write some data for the read tests to read to self.ss
+ I write some data for the read tests to read to self.aa
If tail_segment=True, then I will write a share that has a
smaller tail segment than other segments.
"""
- write = self.ss.remote_slot_testv_and_readv_and_writev
+ write = self.aa.remote_slot_testv_and_readv_and_writev
data = self.build_test_mdmf_share(tail_segment, empty)
# Finally, we write the whole thing to the storage server in one
# pass.
results = write(storage_index, self.secrets, tws, readv)
self.failUnless(results[0])
-
def build_test_sdmf_share(self, empty=False):
if empty:
sharedata = ""
# Some tests need SDMF shares to verify that we can still
# read them. This method writes one, which resembles but is not
assert self.rref
- write = self.ss.remote_slot_testv_and_readv_and_writev
+ write = self.aa.remote_slot_testv_and_readv_and_writev
share = self.build_test_sdmf_share(empty)
testvs = [(0, 1, "eq", "")]
tws = {}
# blocks.
mw = self._make_new_mw("si1", 0)
# Test writing some blocks.
- read = self.ss.remote_slot_readv
+ read = self.aa.remote_slot_readv
expected_private_key_offset = struct.calcsize(MDMFHEADER)
expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
PRIVATE_KEY_SIZE + \
d = sdmfr.finish_publishing()
def _then(ignored):
self.failUnlessEqual(self.rref.write_count, 1)
- read = self.ss.remote_slot_readv
+ read = self.aa.remote_slot_readv
self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
{0: [data]})
d.addCallback(_then)
sdmfw.finish_publishing())
def _then_again(results):
self.failUnless(results[0])
- read = self.ss.remote_slot_readv
+ read = self.aa.remote_slot_readv
self.failUnlessEqual(read("si1", [0], [(1, 8)]),
{0: [struct.pack(">Q", 1)]})
self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
d.addBoth(self._wait_for_yield, bucket_counter)
return d
-class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
- stop_after_first_bucket = False
- def process_bucket(self, *args, **kwargs):
- LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
- if self.stop_after_first_bucket:
- self.stop_after_first_bucket = False
- self.cpu_slice = -1.0
- def yielding(self, sleep_time):
- if not self.stop_after_first_bucket:
- self.cpu_slice = 500
-
-class BrokenStatResults:
- pass
-class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
- def stat(self, fn):
- s = os.stat(fn)
- bsr = BrokenStatResults()
- for attrname in dir(s):
- if attrname.startswith("_"):
- continue
- if attrname == "st_blocks":
- continue
- setattr(bsr, attrname, getattr(s, attrname))
- return bsr
-
-class InstrumentedStorageServer(StorageServer):
- LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
-class No_ST_BLOCKS_StorageServer(StorageServer):
- LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
-
-class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
+class AccountingCrawlerTest(unittest.TestCase, CrawlerTestMixin, WebRenderingMixin, ReallyEqualMixin):
def setUp(self):
self.s = service.MultiService()
self.s.startService()
return self.s.stopService()
- def make_shares(self, ss):
+ def make_shares(self, server):
+ aa = server.get_accountant().get_anonymous_account()
+ sa = server.get_accountant().get_starter_account()
+
def make(si):
return (si, hashutil.tagged_hash("renew", si),
hashutil.tagged_hash("cancel", si))
# inner contents are not a valid CHK share
data = "\xff" * 1000
- a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
+ a,w = aa.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1000, canary)
w[0].remote_write(0, data)
w[0].remote_close()
- a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
+ a,w = aa.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1000, canary)
w[0].remote_write(0, data)
w[0].remote_close()
- ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
+ sa.remote_add_lease(immutable_si_1, rs1a, cs1a)
- writev = ss.remote_slot_testv_and_readv_and_writev
+ writev = aa.remote_slot_testv_and_readv_and_writev
writev(mutable_si_2, (we2, rs2, cs2),
{0: ([], [(0,data)], len(data))}, [])
writev(mutable_si_3, (we3, rs3, cs3),
{0: ([], [(0,data)], len(data))}, [])
- ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
+ sa.remote_add_lease(mutable_si_3, rs3a, cs3a)
self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
- def BROKEN_test_basic(self):
- basedir = "storage/LeaseCrawler/basic"
+ def test_basic(self):
+ basedir = "storage/AccountingCrawler/basic"
fileutil.make_dirs(basedir)
- server = InstrumentedStorageServer(basedir, "\x00" * 20)
- # make it start sooner than usual.
- lc = server.lease_checker
- lc.slow_start = 0
- lc.cpu_slice = 500
- lc.stop_after_first_bucket = True
+ ep = ExpirationPolicy(enabled=False)
+ server = StorageServer(basedir, "\x00" * 20, expiration_policy=ep)
+ aa = server.get_accountant().get_anonymous_account()
+ sa = server.get_accountant().get_starter_account()
+
+ # finish as fast as possible
+ ac = server.get_accounting_crawler()
+ ac.slow_start = 0
+ ac.cpu_slice = 500
+
webstatus = StorageStatus(server)
# create a few shares, with some leases on them
fn = os.path.join(server.sharedir,
storage_index_to_dir(immutable_si_0),
"not-a-share")
- f = open(fn, "wb")
- f.write("I am not a share.\n")
- f.close()
+ fileutil.write(fn, "I am not a share.\n")
# this is before the crawl has started, so we're not in a cycle yet
- initial_state = lc.get_state()
- self.failIf(lc.get_progress()["cycle-in-progress"])
+ initial_state = ac.get_state()
+ self.failIf(ac.get_progress()["cycle-in-progress"])
self.failIfIn("cycle-to-date", initial_state)
self.failIfIn("estimated-remaining-cycle", initial_state)
self.failIfIn("estimated-current-cycle", initial_state)
DAY = 24*60*60
- d = fireEventually()
+ # now examine the state right after the 'aa' prefix has been processed.
+ d = self._after_prefix(None, 'aa', ac)
+ def _after_aa_prefix(state):
+ self.failUnlessIn("cycle-to-date", state)
+ self.failUnlessIn("estimated-remaining-cycle", state)
+ self.failUnlessIn("estimated-current-cycle", state)
+ self.failUnlessIn("history", state)
+ self.failUnlessEqual(state["history"], {})
- # now examine the state right after the first bucket has been
- # processed.
- def _after_first_bucket(ignored):
- initial_state = lc.get_state()
- if "cycle-to-date" not in initial_state:
- d2 = fireEventually()
- d2.addCallback(_after_first_bucket)
- return d2
- self.failUnlessIn("cycle-to-date", initial_state)
- self.failUnlessIn("estimated-remaining-cycle", initial_state)
- self.failUnlessIn("estimated-current-cycle", initial_state)
- self.failUnlessIn("history", initial_state)
- self.failUnlessEqual(initial_state["history"], {})
-
- so_far = initial_state["cycle-to-date"]
+ so_far = state["cycle-to-date"]
self.failUnlessEqual(so_far["expiration-enabled"], False)
self.failUnlessIn("configured-expiration-mode", so_far)
self.failUnlessIn("lease-age-histogram", so_far)
self.failUnlessEqual(type(lah), list)
self.failUnlessEqual(len(lah), 1)
self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
- self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
self.failUnlessEqual(so_far["corrupt-shares"], [])
sr1 = so_far["space-recovered"]
self.failUnlessEqual(sr1["examined-buckets"], 1)
self.failUnlessEqual(sr1["examined-shares"], 1)
self.failUnlessEqual(sr1["actual-shares"], 0)
- left = initial_state["estimated-remaining-cycle"]
+ left = state["estimated-remaining-cycle"]
sr2 = left["space-recovered"]
self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
self.failIfEqual(sr2["actual-shares"], None)
- self.failIfEqual(sr2["configured-diskbytes"], None)
- self.failIfEqual(sr2["original-sharebytes"], None)
- d.addCallback(_after_first_bucket)
+ d.addCallback(_after_aa_prefix)
+
d.addCallback(lambda ign: self.render1(webstatus))
def _check_html_in_cycle(html):
s = remove_tags(html)
self.failUnlessIn("and has recovered: "
"0 shares, 0 buckets (0 mutable / 0 immutable), "
"0 B (0 B / 0 B)", s)
+
+ return ac.set_hook('after_cycle')
d.addCallback(_check_html_in_cycle)
- # wait for the crawler to finish the first cycle. Nothing should have
- # been removed.
- def _wait():
- return bool(lc.get_state()["last-cycle-finished"] is not None)
- d.addCallback(lambda ign: self.poll(_wait))
+ def _after_first_cycle(cycle):
+ # After the first cycle, nothing should have been removed.
+ self.failUnlessEqual(cycle, 0)
+ progress = ac.get_progress()
+ self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
- def _after_first_cycle(ignored):
- s = lc.get_state()
+ s = ac.get_state()
self.failIf("cycle-to-date" in s)
self.failIf("estimated-remaining-cycle" in s)
self.failIf("estimated-current-cycle" in s)
last = s["history"][0]
+ self.failUnlessEqual(type(last), dict, repr(last))
self.failUnlessIn("cycle-start-finish-times", last)
- self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
+ self.failUnlessEqual(type(last["cycle-start-finish-times"]), list, repr(last))
self.failUnlessEqual(last["expiration-enabled"], False)
self.failUnlessIn("configured-expiration-mode", last)
lah = last["lease-age-histogram"]
self.failUnlessEqual(type(lah), list)
self.failUnlessEqual(len(lah), 1)
- self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
+ self.failUnlessEqual(tuple(lah[0]), (0.0, DAY, 6) )
- self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
self.failUnlessEqual(last["corrupt-shares"], [])
rec = last["space-recovered"]
self.failUnlessEqual(rec["actual-shares"], 0)
self.failUnlessEqual(rec["actual-diskbytes"], 0)
- def _get_sharefile(si):
- return list(server._iter_share_files(si))[0]
def count_leases(si):
- return len(list(_get_sharefile(si).get_leases()))
- self.failUnlessEqual(count_leases(immutable_si_0), 1)
- self.failUnlessEqual(count_leases(immutable_si_1), 2)
- self.failUnlessEqual(count_leases(mutable_si_2), 1)
- self.failUnlessEqual(count_leases(mutable_si_3), 2)
+ return (len(aa.get_leases(si)), len(sa.get_leases(si)))
+ self.failUnlessEqual(count_leases(immutable_si_0), (1, 0))
+ self.failUnlessEqual(count_leases(immutable_si_1), (1, 1))
+ self.failUnlessEqual(count_leases(mutable_si_2), (1, 0))
+ self.failUnlessEqual(count_leases(mutable_si_3), (1, 1))
d.addCallback(_after_first_cycle)
+
d.addCallback(lambda ign: self.render1(webstatus))
- def _check_html(html):
+ def _check_html_after_cycle(html):
s = remove_tags(html)
self.failUnlessIn("recovered: 0 shares, 0 buckets "
"(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
"(2 mutable / 2 immutable),", s)
self.failUnlessIn("but expiration was not enabled", s)
- d.addCallback(_check_html)
+ d.addCallback(_check_html_after_cycle)
+
d.addCallback(lambda ign: self.render_json(webstatus))
- def _check_json(json):
+ def _check_json_after_cycle(json):
data = simplejson.loads(json)
self.failUnlessIn("lease-checker", data)
self.failUnlessIn("lease-checker-progress", data)
- d.addCallback(_check_json)
+ d.addCallback(_check_json_after_cycle)
+ d.addBoth(self._wait_for_yield, ac)
return d
- def backdate_lease(self, sf, renew_secret, new_expire_time):
- # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
- # "renew" a lease with a new_expire_time that is older than what the
- # current lease has), so we have to reach inside it.
- for i,lease in enumerate(sf.get_leases()):
- if lease.renew_secret == renew_secret:
- lease.expiration_time = new_expire_time
- f = open(sf.home, 'rb+')
- sf._write_lease_record(f, i, lease)
- f.close()
- return
- raise IndexError("unable to renew non-existent lease")
-
- def BROKEN_test_expire_age(self):
- basedir = "storage/LeaseCrawler/expire_age"
+ def test_expire_age(self):
+ basedir = "storage/AccountingCrawler/expire_age"
fileutil.make_dirs(basedir)
# setting expiration_time to 2000 means that any lease which is more
# than 2000s old will be expired.
- server = InstrumentedStorageServer(basedir, "\x00" * 20,
- expiration_enabled=True,
- expiration_mode="age",
- expiration_override_lease_duration=2000)
- # make it start sooner than usual.
- lc = server.lease_checker
- lc.slow_start = 0
- lc.stop_after_first_bucket = True
+ now = time.time()
+ ep = ExpirationPolicy(enabled=True, mode="age", override_lease_duration=2000)
+ server = StorageServer(basedir, "\x00" * 20, expiration_policy=ep)
+ aa = server.get_accountant().get_anonymous_account()
+ sa = server.get_accountant().get_starter_account()
+
+ # finish as fast as possible
+ ac = server.get_accounting_crawler()
+ ac.slow_start = 0
+ ac.cpu_slice = 500
+
webstatus = StorageStatus(server)
# create a few shares, with some leases on them
def _get_sharefile(si):
return list(server._iter_share_files(si))[0]
def count_leases(si):
- return len(list(_get_sharefile(si).get_leases()))
+ return (len(aa.get_leases(si)), len(sa.get_leases(si)))
self.failUnlessEqual(count_shares(immutable_si_0), 1)
- self.failUnlessEqual(count_leases(immutable_si_0), 1)
+ self.failUnlessEqual(count_leases(immutable_si_0), (1, 0))
self.failUnlessEqual(count_shares(immutable_si_1), 1)
- self.failUnlessEqual(count_leases(immutable_si_1), 2)
+ self.failUnlessEqual(count_leases(immutable_si_1), (1, 1))
self.failUnlessEqual(count_shares(mutable_si_2), 1)
- self.failUnlessEqual(count_leases(mutable_si_2), 1)
+ self.failUnlessEqual(count_leases(mutable_si_2), (1, 0))
self.failUnlessEqual(count_shares(mutable_si_3), 1)
- self.failUnlessEqual(count_leases(mutable_si_3), 2)
+ self.failUnlessEqual(count_leases(mutable_si_3), (1, 1))
+
+ # artificially crank back the renewal time on the first lease of each
+ # share to 3000s ago, and set the expiration time to 31 days later.
+ new_renewal_time = now - 3000
+ new_expiration_time = new_renewal_time + 31*24*60*60
- # artificially crank back the expiration time on the first lease of
- # each share, to make it look like it expired already (age=1000s).
# Some shares have an extra lease which is set to expire at the
# default time in 31 days from now (age=31days). We then run the
# crawler, which will expire the first lease, making some shares get
# deleted and others stay alive (with one remaining lease)
- now = time.time()
- sf0 = _get_sharefile(immutable_si_0)
- self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
+ aa.add_or_renew_lease(immutable_si_0, 0, new_renewal_time, new_expiration_time)
# immutable_si_1 gets an extra lease
- sf1 = _get_sharefile(immutable_si_1)
- self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
+ sa.add_or_renew_lease(immutable_si_1, 0, new_renewal_time, new_expiration_time)
- sf2 = _get_sharefile(mutable_si_2)
- self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
+ aa.add_or_renew_lease(mutable_si_2, 0, new_renewal_time, new_expiration_time)
# mutable_si_3 gets an extra lease
- sf3 = _get_sharefile(mutable_si_3)
- self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
+ sa.add_or_renew_lease(mutable_si_3, 0, new_renewal_time, new_expiration_time)
server.setServiceParent(self.s)
- d = fireEventually()
- # examine the state right after the first bucket has been processed
- def _after_first_bucket(ignored):
- p = lc.get_progress()
- if not p["cycle-in-progress"]:
- d2 = fireEventually()
- d2.addCallback(_after_first_bucket)
- return d2
- d.addCallback(_after_first_bucket)
+ # now examine the web status right after the 'aa' prefix has been processed.
+ d = self._after_prefix(None, 'aa', ac)
d.addCallback(lambda ign: self.render1(webstatus))
def _check_html_in_cycle(html):
s = remove_tags(html)
- # the first bucket encountered gets deleted, and its prefix
+ # the first shareset encountered gets deleted, and its prefix
# happens to be about 1/5th of the way through the ring, so the
# predictor thinks we'll have 5 shares and that we'll delete them
# all. This part of the test depends upon the SIs landing right
self.failUnlessIn("The whole cycle is expected to examine "
"5 shares in 5 buckets and to recover: "
"5 shares, 5 buckets", s)
- d.addCallback(_check_html_in_cycle)
- # wait for the crawler to finish the first cycle. Two shares should
- # have been removed
- def _wait():
- return bool(lc.get_state()["last-cycle-finished"] is not None)
- d.addCallback(lambda ign: self.poll(_wait))
+ return ac.set_hook('after_cycle')
+ d.addCallback(_check_html_in_cycle)
def _after_first_cycle(ignored):
self.failUnlessEqual(count_shares(immutable_si_0), 0)
self.failUnlessEqual(count_shares(immutable_si_1), 1)
- self.failUnlessEqual(count_leases(immutable_si_1), 1)
+ self.failUnlessEqual(count_leases(immutable_si_1), (1, 0))
self.failUnlessEqual(count_shares(mutable_si_2), 0)
self.failUnlessEqual(count_shares(mutable_si_3), 1)
- self.failUnlessEqual(count_leases(mutable_si_3), 1)
+ self.failUnlessEqual(count_leases(mutable_si_3), (1, 0))
- s = lc.get_state()
+ s = ac.get_state()
last = s["history"][0]
self.failUnlessEqual(last["expiration-enabled"], True)
- self.failUnlessEqual(last["configured-expiration-mode"],
- ("age", 2000, None, ("mutable", "immutable")))
- self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
+ cem = last["configured-expiration-mode"]
+ self.failUnlessEqual(cem[0], "age")
+ self.failUnlessEqual(cem[1], 2000)
+ self.failUnlessEqual(cem[2], None)
+ self.failUnlessEqual(cem[3][0], "mutable")
+ self.failUnlessEqual(cem[3][1], "immutable")
rec = last["space-recovered"]
self.failUnlessEqual(rec["examined-buckets"], 4)
self.failUnless(rec["actual-diskbytes"] >= 0,
rec["actual-diskbytes"])
d.addCallback(_after_first_cycle)
+
d.addCallback(lambda ign: self.render1(webstatus))
- def _check_html(html):
+ def _check_html_after_cycle(html):
s = remove_tags(html)
self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
- d.addCallback(_check_html)
+ d.addCallback(_check_html_after_cycle)
+ d.addBoth(self._wait_for_yield, ac)
return d
- def BROKEN_test_expire_cutoff_date(self):
- basedir = "storage/LeaseCrawler/expire_cutoff_date"
+ def test_expire_cutoff_date(self):
+ basedir = "storage/AccountingCrawler/expire_cutoff_date"
fileutil.make_dirs(basedir)
# setting cutoff-date to 2000 seconds ago means that any lease which
# is more than 2000s old will be expired.
now = time.time()
then = int(now - 2000)
- server = InstrumentedStorageServer(basedir, "\x00" * 20,
- expiration_enabled=True,
- expiration_mode="cutoff-date",
- expiration_cutoff_date=then)
- # make it start sooner than usual.
- lc = server.lease_checker
- lc.slow_start = 0
- lc.stop_after_first_bucket = True
+ ep = ExpirationPolicy(enabled=True, mode="cutoff-date", cutoff_date=then)
+ server = StorageServer(basedir, "\x00" * 20, expiration_policy=ep)
+ aa = server.get_accountant().get_anonymous_account()
+ sa = server.get_accountant().get_starter_account()
+
+ # finish as fast as possible
+ ac = server.get_accounting_crawler()
+ ac.slow_start = 0
+ ac.cpu_slice = 500
+
webstatus = StorageStatus(server)
# create a few shares, with some leases on them
def _get_sharefile(si):
return list(server._iter_share_files(si))[0]
def count_leases(si):
- return len(list(_get_sharefile(si).get_leases()))
+ return (len(aa.get_leases(si)), len(sa.get_leases(si)))
self.failUnlessEqual(count_shares(immutable_si_0), 1)
- self.failUnlessEqual(count_leases(immutable_si_0), 1)
+ self.failUnlessEqual(count_leases(immutable_si_0), (1, 0))
self.failUnlessEqual(count_shares(immutable_si_1), 1)
- self.failUnlessEqual(count_leases(immutable_si_1), 2)
+ self.failUnlessEqual(count_leases(immutable_si_1), (1, 1))
self.failUnlessEqual(count_shares(mutable_si_2), 1)
- self.failUnlessEqual(count_leases(mutable_si_2), 1)
+ self.failUnlessEqual(count_leases(mutable_si_2), (1, 0))
self.failUnlessEqual(count_shares(mutable_si_3), 1)
- self.failUnlessEqual(count_leases(mutable_si_3), 2)
+ self.failUnlessEqual(count_leases(mutable_si_3), (1, 1))
- # artificially crank back the expiration time on the first lease of
- # each share, to make it look like was renewed 3000s ago. To achieve
- # this, we need to set the expiration time to now-3000+31days. This
- # will change when the lease format is improved to contain both
- # create/renew time and duration.
- new_expiration_time = now - 3000 + 31*24*60*60
+ # artificially crank back the renewal time on the first lease of each
+ # share to 3000s ago, and set the expiration time to 31 days later.
+ new_renewal_time = now - 3000
+ new_expiration_time = new_renewal_time + 31*24*60*60
# Some shares have an extra lease which is set to expire at the
# default time in 31 days from now (age=31days). We then run the
# crawler, which will expire the first lease, making some shares get
# deleted and others stay alive (with one remaining lease)
- sf0 = _get_sharefile(immutable_si_0)
- self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
+ aa.add_or_renew_lease(immutable_si_0, 0, new_renewal_time, new_expiration_time)
# immutable_si_1 gets an extra lease
- sf1 = _get_sharefile(immutable_si_1)
- self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
+ sa.add_or_renew_lease(immutable_si_1, 0, new_renewal_time, new_expiration_time)
- sf2 = _get_sharefile(mutable_si_2)
- self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
+ aa.add_or_renew_lease(mutable_si_2, 0, new_renewal_time, new_expiration_time)
# mutable_si_3 gets an extra lease
- sf3 = _get_sharefile(mutable_si_3)
- self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
+ sa.add_or_renew_lease(mutable_si_3, 0, new_renewal_time, new_expiration_time)
server.setServiceParent(self.s)
- d = fireEventually()
- # examine the state right after the first bucket has been processed
- def _after_first_bucket(ignored):
- p = lc.get_progress()
- if not p["cycle-in-progress"]:
- d2 = fireEventually()
- d2.addCallback(_after_first_bucket)
- return d2
- d.addCallback(_after_first_bucket)
+ # now examine the web status right after the 'aa' prefix has been processed.
+ d = self._after_prefix(None, 'aa', ac)
d.addCallback(lambda ign: self.render1(webstatus))
def _check_html_in_cycle(html):
s = remove_tags(html)
self.failUnlessIn("The whole cycle is expected to examine "
"5 shares in 5 buckets and to recover: "
"5 shares, 5 buckets", s)
- d.addCallback(_check_html_in_cycle)
- # wait for the crawler to finish the first cycle. Two shares should
- # have been removed
- def _wait():
- return bool(lc.get_state()["last-cycle-finished"] is not None)
- d.addCallback(lambda ign: self.poll(_wait))
+ return ac.set_hook('after_cycle')
+ d.addCallback(_check_html_in_cycle)
def _after_first_cycle(ignored):
self.failUnlessEqual(count_shares(immutable_si_0), 0)
self.failUnlessEqual(count_shares(immutable_si_1), 1)
- self.failUnlessEqual(count_leases(immutable_si_1), 1)
+ self.failUnlessEqual(count_leases(immutable_si_1), (1, 0))
self.failUnlessEqual(count_shares(mutable_si_2), 0)
self.failUnlessEqual(count_shares(mutable_si_3), 1)
- self.failUnlessEqual(count_leases(mutable_si_3), 1)
+ self.failUnlessEqual(count_leases(mutable_si_3), (1, 0))
- s = lc.get_state()
+ s = ac.get_state()
last = s["history"][0]
self.failUnlessEqual(last["expiration-enabled"], True)
- self.failUnlessEqual(last["configured-expiration-mode"],
- ("cutoff-date", None, then,
- ("mutable", "immutable")))
- self.failUnlessEqual(last["leases-per-share-histogram"],
- {1: 2, 2: 2})
+ cem = last["configured-expiration-mode"]
+ self.failUnlessEqual(cem[0], "cutoff-date")
+ self.failUnlessEqual(cem[1], None)
+ self.failUnlessEqual(cem[2], then)
+ self.failUnlessEqual(cem[3][0], "mutable")
+ self.failUnlessEqual(cem[3][1], "immutable")
rec = last["space-recovered"]
self.failUnlessEqual(rec["examined-buckets"], 4)
self.failUnless(rec["actual-diskbytes"] >= 0,
rec["actual-diskbytes"])
d.addCallback(_after_first_cycle)
+
d.addCallback(lambda ign: self.render1(webstatus))
- def _check_html(html):
+ def _check_html_after_cycle(html):
s = remove_tags(html)
self.failUnlessIn("Expiration Enabled:"
" expired leases will be removed", s)
substr = "Leases created or last renewed before %s will be considered expired." % date
self.failUnlessIn(substr, s)
self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
- d.addCallback(_check_html)
+ d.addCallback(_check_html_after_cycle)
+ d.addBoth(self._wait_for_yield, ac)
return d
-
def test_bad_mode(self):
- basedir = "storage/LeaseCrawler/bad_mode"
- fileutil.make_dirs(basedir)
- e = self.failUnlessRaises(ValueError,
- StorageServer, basedir, "\x00" * 20,
- expiration_mode="bogus")
+ e = self.failUnlessRaises(AssertionError,
+ ExpirationPolicy, enabled=True, mode="bogus")
self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
def test_parse_duration(self):
self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
self.failUnlessEqual(p("2009-03-18"), 1237334400)
- def BROKEN_test_limited_history(self):
- basedir = "storage/LeaseCrawler/limited_history"
+ def test_limited_history(self):
+ basedir = "storage/AccountingCrawler/limited_history"
fileutil.make_dirs(basedir)
server = StorageServer(basedir, "\x00" * 20)
- # make it start sooner than usual.
- lc = server.lease_checker
- lc.slow_start = 0
- lc.cpu_slice = 500
- # create a few shares, with some leases on them
+ # finish as fast as possible
+ RETAINED = 2
+ CYCLES = 4
+ ac = server.get_accounting_crawler()
+ ac._leasedb.retained_history_entries = RETAINED
+ ac.slow_start = 0
+ ac.cpu_slice = 500
+ ac.allowed_cpu_proportion = 1.0
+ ac.minimum_cycle_time = 0
+ # create a few shares, with some leases on them
self.make_shares(server)
server.setServiceParent(self.s)
- def _wait_until_15_cycles_done():
- last = lc.state["last-cycle-finished"]
- if last is not None and last >= 15:
- return True
- if lc.timer:
- lc.timer.reset(0)
- return False
- d = self.poll(_wait_until_15_cycles_done)
- def _check(ignored):
- s = lc.get_state()
- h = s["history"]
- self.failUnlessEqual(len(h), 10)
- self.failUnlessEqual(max(h.keys()), 15)
- self.failUnlessEqual(min(h.keys()), 6)
- d.addCallback(_check)
+ d = ac.set_hook('after_cycle')
+ def _after_cycle(cycle):
+ if cycle < CYCLES:
+ return ac.set_hook('after_cycle').addCallback(_after_cycle)
+
+ state = ac.get_state()
+ self.failUnlessIn("history", state)
+ h = state["history"]
+ self.failUnlessEqual(len(h), RETAINED)
+ self.failUnlessEqual(max(h.keys()), CYCLES)
+ self.failUnlessEqual(min(h.keys()), CYCLES-RETAINED+1)
+ d.addCallback(_after_cycle)
+ d.addBoth(self._wait_for_yield, ac)
return d
- def BROKEN_test_unpredictable_future(self):
- basedir = "storage/LeaseCrawler/unpredictable_future"
+ def OFF_test_unpredictable_future(self):
+ basedir = "storage/AccountingCrawler/unpredictable_future"
fileutil.make_dirs(basedir)
server = StorageServer(basedir, "\x00" * 20)
+
# make it start sooner than usual.
- lc = server.lease_checker
- lc.slow_start = 0
- lc.cpu_slice = -1.0 # stop quickly
+ ac = server.get_accounting_crawler()
+ ac.slow_start = 0
+ ac.cpu_slice = -1.0 # stop quickly
self.make_shares(server)
# progress-measurer gets smart enough to count buckets (we'll
# have to interrupt it even earlier, before it's finished the
# first bucket).
- s = lc.get_state()
+ s = ac.get_state()
if "cycle-to-date" not in s:
- d2 = fireEventually()
- d2.addCallback(_check)
- return d2
+ return reactor.callLater(0.2, _check)
self.failUnlessIn("cycle-to-date", s)
self.failUnlessIn("estimated-remaining-cycle", s)
self.failUnlessIn("estimated-current-cycle", s)
self.failUnlessEqual(full["actual-buckets"], None)
self.failUnlessEqual(full["actual-shares"], None)
self.failUnlessEqual(full["actual-diskbytes"], None)
- return d
-
- def BROKEN_test_no_st_blocks(self):
- basedir = "storage/LeaseCrawler/no_st_blocks"
- fileutil.make_dirs(basedir)
- ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
- expiration_mode="age",
- expiration_override_lease_duration=-1000)
- # a negative expiration_time= means the "configured-"
- # space-recovered counts will be non-zero, since all shares will have
- # expired by then
- # make it start sooner than usual.
- lc = ss.lease_checker
- lc.slow_start = 0
-
- self.make_shares(ss)
- ss.setServiceParent(self.s)
- def _wait():
- return bool(lc.get_state()["last-cycle-finished"] is not None)
- d = self.poll(_wait)
-
- def _check(ignored):
- s = lc.get_state()
- last = s["history"][0]
- rec = last["space-recovered"]
- self.failUnlessEqual(rec["configured-buckets"], 4)
- self.failUnlessEqual(rec["configured-shares"], 4)
- self.failUnless(rec["configured-sharebytes"] > 0,
- rec["configured-sharebytes"])
- # without the .st_blocks field in os.stat() results, we should be
- # reporting diskbytes==sharebytes
- self.failUnlessEqual(rec["configured-sharebytes"],
- rec["configured-diskbytes"])
d.addCallback(_check)
return d
- def BROKEN_test_share_corruption(self):
- self._poll_should_ignore_these_errors = [
- UnknownMutableContainerVersionError,
- UnknownImmutableContainerVersionError,
- ]
- basedir = "storage/LeaseCrawler/share_corruption"
- fileutil.make_dirs(basedir)
- ss = InstrumentedStorageServer(basedir, "\x00" * 20)
- w = StorageStatus(ss)
- # make it start sooner than usual.
- lc = ss.lease_checker
- lc.stop_after_first_bucket = True
- lc.slow_start = 0
- lc.cpu_slice = 500
-
- # create a few shares, with some leases on them
- self.make_shares(ss)
-
- # now corrupt one, and make sure the lease-checker keeps going
- [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
- first = min(self.sis)
- first_b32 = base32.b2a(first)
- fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
- f = open(fn, "rb+")
- f.seek(0)
- f.write("BAD MAGIC")
- f.close()
- # if get_share_file() doesn't see the correct mutable magic, it
- # assumes the file is an immutable share, and then
- # immutable.ShareFile sees a bad version. So regardless of which kind
- # of share we corrupted, this will trigger an
- # UnknownImmutableContainerVersionError.
-
- # also create an empty bucket
- empty_si = base32.b2a("\x04"*16)
- empty_bucket_dir = os.path.join(ss.sharedir,
- storage_index_to_dir(empty_si))
- fileutil.make_dirs(empty_bucket_dir)
-
- ss.setServiceParent(self.s)
-
- d = fireEventually()
-
- # now examine the state right after the first bucket has been
- # processed.
- def _after_first_bucket(ignored):
- s = lc.get_state()
- if "cycle-to-date" not in s:
- d2 = fireEventually()
- d2.addCallback(_after_first_bucket)
- return d2
- so_far = s["cycle-to-date"]
- rec = so_far["space-recovered"]
- self.failUnlessEqual(rec["examined-buckets"], 1)
- self.failUnlessEqual(rec["examined-shares"], 0)
- self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
- d.addCallback(_after_first_bucket)
-
- d.addCallback(lambda ign: self.render_json(w))
- def _check_json(json):
- data = simplejson.loads(json)
- # grr. json turns all dict keys into strings.
- so_far = data["lease-checker"]["cycle-to-date"]
- corrupt_shares = so_far["corrupt-shares"]
- # it also turns all tuples into lists
- self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
- d.addCallback(_check_json)
- d.addCallback(lambda ign: self.render1(w))
- def _check_html(html):
- s = remove_tags(html)
- self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
- d.addCallback(_check_html)
-
- def _wait():
- return bool(lc.get_state()["last-cycle-finished"] is not None)
- d.addCallback(lambda ign: self.poll(_wait))
-
- def _after_first_cycle(ignored):
- s = lc.get_state()
- last = s["history"][0]
- rec = last["space-recovered"]
- self.failUnlessEqual(rec["examined-buckets"], 5)
- self.failUnlessEqual(rec["examined-shares"], 3)
- self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
- d.addCallback(_after_first_cycle)
- d.addCallback(lambda ign: self.render_json(w))
- def _check_json_history(json):
- data = simplejson.loads(json)
- last = data["lease-checker"]["history"]["0"]
- corrupt_shares = last["corrupt-shares"]
- self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
- d.addCallback(_check_json_history)
- d.addCallback(lambda ign: self.render1(w))
- def _check_html_history(html):
- s = remove_tags(html)
- self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
- d.addCallback(_check_html_history)
-
- def _cleanup(res):
- self.flushLoggedErrors(UnknownMutableContainerVersionError,
- UnknownImmutableContainerVersionError)
- return res
- d.addBoth(_cleanup)
- return d
-
def render_json(self, page):
d = self.render1(page, args={"t": ["json"]})
return d
-class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
+class WebStatus(unittest.TestCase, WebRenderingMixin):
def setUp(self):
self.s = service.MultiService()
self.s.startService()