From: Brian Warner Date: Sun, 2 Sep 2007 21:47:15 +0000 (-0700) Subject: storage: replace sqlite with in-share lease records X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=0fe120578956ffc1ee05927ba3d9a24369a14ca2;p=tahoe-lafs%2Ftahoe-lafs.git storage: replace sqlite with in-share lease records --- diff --git a/MANIFEST.in b/MANIFEST.in index 67695049..3282c071 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,2 @@ include allmydata/web/*.xhtml allmydata/web/*.html allmydata/web/*.css -include allmydata/*.sql diff --git a/README b/README index a0edc0f3..4e495450 100644 --- a/README +++ b/README @@ -109,8 +109,6 @@ gcc make python-dev python-twisted python-nevow python-pyopenssl". libraries with the cygwin package management tool, then get the pyOpenSSL source code, cd into it, and run "python ./setup.py install". - + pysqlite3 (database library) - + the pywin32 package: only required on Windows http://sourceforge.net/projects/pywin32/ diff --git a/setup.py b/setup.py index 1b2ed117..9d18432d 100644 --- a/setup.py +++ b/setup.py @@ -90,8 +90,7 @@ setup(name='allmydata-tahoe', ], package_dir={ "allmydata": "src/allmydata",}, scripts = ["bin/allmydata-tahoe"], - package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css', - 'owner.sql'] }, + package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css'] }, classifiers=trove_classifiers, test_suite="allmydata.test", ext_modules=[ diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index 1d1a60a0..123090ab 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -3,7 +3,6 @@ import os, re, weakref, stat, struct, time from foolscap import Referenceable from twisted.application import service from twisted.internet import defer -from twisted.python import util from zope.interface import implements from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ @@ -11,60 +10,180 @@ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ from allmydata.util import fileutil, idlib, mathutil from allmydata.util.assertutil import precondition -try: - # python2.5 ships with sqlite builtin - import sqlite3.dbapi2 - sqlite = sqlite3.dbapi2 -except ImportError: - # for python2.4, it's installed under a different name - import pysqlite2.dbapi2 - sqlite = pysqlite2.dbapi2 - -# store/ -# store/owners.db -# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success -# store/shares/$STORAGEINDEX -# store/shares/$STORAGEINDEX/$SHARENUM -# store/shares/$STORAGEINDEX/$SHARENUM/blocksize -# store/shares/$STORAGEINDEX/$SHARENUM/data -# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes -# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree +# storage/ +# storage/shares/incoming +# incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be +# moved to storage/shares/$STORAGEINDEX/$SHARENUM upon success +# storage/shares/$STORAGEINDEX +# storage/shares/$STORAGEINDEX/$SHARENUM # $SHARENUM matches this regex: NUM_RE=re.compile("[0-9]*") +# each share file (in storage/shares/$SI/$SHNUM) contains lease information +# and share data. The share data is accessed by RIBucketWriter.write and +# RIBucketReader.read . The lease information is not accessible through these +# interfaces. + +# The share file has the following layout: +# 0x00: share file version number, four bytes, current version is 1 +# 0x04: share data length, four bytes big-endian = A +# 0x08: number of leases, four bytes big-endian +# 0x0c: beginning of share data (described below, at WriteBucketProxy) +# A+0x0c = B: first lease. Lease format is: +# B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner +# B+0x04: renew secret, 32 bytes (SHA256) +# B+0x24: cancel secret, 32 bytes (SHA256) +# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch +# B+0x48: next lease, or end of record + +class ShareFile: + LEASE_SIZE = struct.calcsize(">L32s32sL") + + def __init__(self, filename): + self.home = filename + f = open(self.home, 'rb') + (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc)) + assert version == 1 + self._size = size + self._num_leases = num_leases + self._data_offset = 0xc + self._lease_offset = 0xc + self._size + + def read_share_data(self, offset, length): + precondition(offset >= 0) + precondition(offset+length <= self._size) + f = open(self.home, 'rb') + f.seek(self._data_offset+offset) + return f.read(length) + + def write_share_data(self, offset, data): + length = len(data) + precondition(offset >= 0) + precondition(offset+length <= self._size) + f = open(self.home, 'rb+') + real_offset = self._data_offset+offset + f.seek(real_offset) + assert f.tell() == real_offset + f.write(data) + f.close() + + def _write_lease_record(self, f, lease_number, lease_info): + (owner_num, renew_secret, cancel_secret, expiration_time) = lease_info + offset = self._lease_offset + lease_number * self.LEASE_SIZE + f.seek(offset) + assert f.tell() == offset + f.write(struct.pack(">L32s32sL", + owner_num, renew_secret, cancel_secret, + expiration_time)) + + def _read_num_leases(self, f): + f.seek(0x08) + (num_leases,) = struct.unpack(">L", f.read(4)) + return num_leases + + def _write_num_leases(self, f, num_leases): + f.seek(0x08) + f.write(struct.pack(">L", num_leases)) + + def _truncate_leases(self, f, num_leases): + f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE) + + def iter_leases(self): + """Yields (ownernum, renew_secret, cancel_secret, expiration_time) + for all leases.""" + f = open(self.home, 'rb') + (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc)) + f.seek(self._lease_offset) + for i in range(num_leases): + data = f.read(self.LEASE_SIZE) + if data: + yield struct.unpack(">L32s32sL", data) + + def add_lease(self, lease_info): + f = open(self.home, 'rb+') + num_leases = self._read_num_leases(f) + self._write_lease_record(f, num_leases, lease_info) + self._write_num_leases(f, num_leases+1) + f.close() + + def renew_lease(self, renew_secret, new_expire_time): + for i,(on,rs,cs,et) in enumerate(self.iter_leases()): + if rs == renew_secret: + # yup. See if we need to update the owner time. + if new_expire_time > et: + # yes + new_lease = (on,rs,cs,new_expire_time) + f = open(self.home, 'rb+') + self._write_lease_record(f, i, new_lease) + f.close() + return + raise IndexError("unable to renew non-existent lease") + + def cancel_lease(self, cancel_secret): + """Remove a lease with the given cancel_secret. Return + (num_remaining_leases, space_freed). Raise IndexError if there was no + lease with the given cancel_secret.""" + + leases = list(self.iter_leases()) + num_leases = len(leases) + num_leases_removed = 0 + for i,lease_info in enumerate(leases[:]): + (on,rs,cs,et) = lease_info + if cs == cancel_secret: + leases[i] = None + num_leases_removed += 1 + if not num_leases_removed: + raise IndexError("unable to find matching lease to cancel") + if num_leases_removed: + # pack and write out the remaining leases. We write these out in + # the same order as they were added, so that if we crash while + # doing this, we won't lose any non-cancelled leases. + leases = [l for l in leases if l] # remove the cancelled leases + f = open(self.home, 'rb+') + for i,lease in enumerate(leases): + self._write_lease_record(f, i, lease) + self._write_num_leases(f, len(leases)) + self._truncate_leases(f, len(leases)) + f.close() + return len(leases), self.LEASE_SIZE * num_leases_removed + + class BucketWriter(Referenceable): implements(RIBucketWriter) - def __init__(self, ss, incominghome, finalhome, size): + def __init__(self, ss, incominghome, finalhome, size, lease_info): self.ss = ss self.incominghome = incominghome self.finalhome = finalhome self._size = size + self._lease_info = lease_info self.closed = False self.throw_out_all_data = False - # touch the file, so later callers will see that we're working on it - f = open(self.incominghome, 'ab') + # touch the file, so later callers will see that we're working on it. + # Also construct the metadata. + assert not os.path.exists(self.incominghome) + f = open(self.incominghome, 'wb') + f.write(struct.pack(">LLL", 1, size, 0)) f.close() + self._sharefile = ShareFile(self.incominghome) def allocated_size(self): return self._size def remote_write(self, offset, data): precondition(not self.closed) - precondition(offset >= 0) - precondition(offset+len(data) <= self._size) if self.throw_out_all_data: return - f = open(self.incominghome, 'ab') - f.seek(offset) - f.write(data) - f.close() + self._sharefile.write_share_data(offset, data) def remote_close(self): precondition(not self.closed) + self._sharefile.add_lease(self._lease_info) fileutil.rename(self.incominghome, self.finalhome) + self._sharefile = None self.closed = True + filelen = os.stat(self.finalhome)[stat.ST_SIZE] self.ss.bucket_writer_closed(self, filelen) @@ -73,12 +192,10 @@ class BucketReader(Referenceable): implements(RIBucketReader) def __init__(self, home): - self.home = home + self._share_file = ShareFile(home) def remote_read(self, offset, length): - f = open(self.home, 'rb') - f.seek(offset) - return f.read(length) + return self._share_file.read_share_data(offset, length) class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer) @@ -97,27 +214,11 @@ class StorageServer(service.MultiService, Referenceable): fileutil.make_dirs(self.incomingdir) self._active_writers = weakref.WeakKeyDictionary() - self.init_db() - self.measure_size() def _clean_incomplete(self): fileutil.rm_dir(self.incomingdir) - def init_db(self): - # files in storedir with non-zbase32 characters in it (like ".") are - # safe, in that they cannot be accessed or overwritten by clients - # (whose binary storage_index values are always converted into a - # filename with idlib.b2a) - db_file = os.path.join(self.storedir, "owners.db") - need_to_init_db = not os.path.exists(db_file) - self._owner_db_con = sqlite.connect(db_file) - self._owner_db_cur = self._owner_db_con.cursor() - if need_to_init_db: - setup_file = util.sibpath(__file__, "owner.sql") - setup = open(setup_file, "r").read() - self._owner_db_cur.executescript(setup) - def measure_size(self): self.consumed = fileutil.du(self.sharedir) @@ -130,10 +231,21 @@ class StorageServer(service.MultiService, Referenceable): def remote_allocate_buckets(self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, - canary): + canary, owner_num=0): + # owner_num is not for clients to set, but rather it should be + # curried into the PersonalStorageServer instance that is dedicated + # to a particular owner. alreadygot = set() bucketwriters = {} # k: shnum, v: BucketWriter si_s = idlib.b2a(storage_index) + + # in this implementation, the lease information (including secrets) + # goes into the share files themselves. It could also be put into a + # separate database. Note that the lease should not be added until + # the BucketWrite has been closed. + expire_time = time.time() + 31*24*60*60 + lease_info = (owner_num, renew_secret, cancel_secret, expire_time) + space_per_bucket = allocated_size no_limits = self.sizelimit is None yes_limits = not no_limits @@ -144,10 +256,17 @@ class StorageServer(service.MultiService, Referenceable): finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum) if os.path.exists(incominghome) or os.path.exists(finalhome): alreadygot.add(shnum) + # add a lease + if os.path.exists(incominghome): + # TODO: add a lease to the still-in-construction share + pass + else: + sf = ShareFile(finalhome) + sf.add_lease(lease_info) elif no_limits or remaining_space >= space_per_bucket: fileutil.make_dirs(os.path.join(self.incomingdir, si_s)) bw = BucketWriter(self, incominghome, finalhome, - space_per_bucket) + space_per_bucket, lease_info) if self.no_storage: bw.throw_out_all_data = True bucketwriters[shnum] = bw @@ -161,109 +280,56 @@ class StorageServer(service.MultiService, Referenceable): if bucketwriters: fileutil.make_dirs(os.path.join(self.sharedir, si_s)) - # now store the secrets somewhere. This requires a - # variable-length-list of (renew,cancel) secret tuples per bucket. - # Note that this does not need to be kept inside the share itself, if - # packing efficiency is a concern. For this implementation, we use a - # sqlite database, which puts everything in a single file. - self.add_lease(storage_index, renew_secret, cancel_secret) - return alreadygot, bucketwriters - def add_lease(self, storage_index, renew_secret, cancel_secret): - # is the bucket already in our database? - cur = self._owner_db_cur - cur.execute("SELECT bucket_id FROM buckets" - " WHERE storage_index = ?", - (storage_index,)) - res = cur.fetchone() - if res: - bucket_id = res[0] - else: - cur.execute("INSERT INTO buckets (storage_index)" - " values(?)", (storage_index,)) - cur.execute("SELECT bucket_id FROM buckets" - " WHERE storage_index = ?", - (storage_index,)) - res = cur.fetchone() - bucket_id = res[0] - - # what time will this lease expire? One month from now. - expire_time = time.time() + 31*24*60*60 - - # now, is this lease already in our database? Since we don't have - # owners yet, look for a match by renew_secret/cancel_secret - cur.execute("SELECT lease_id FROM leases" - " WHERE renew_secret = ? AND cancel_secret = ?", - (renew_secret, cancel_secret)) - res = cur.fetchone() - if res: - # yes, so just update the timestamp - lease_id = res[0] - cur.execute("UPDATE leases" - " SET expire_time = ?" - " WHERE lease_id = ?", - (expire_time, lease_id)) - else: - # no, we need to add the lease - cur.execute("INSERT INTO leases " - "(bucket_id, renew_secret, cancel_secret, expire_time)" - " values(?,?,?,?)", - (bucket_id, renew_secret, cancel_secret, expire_time)) - self._owner_db_con.commit() + def get_or_add_owner(self, owner): + # this will be more fully implemented when we get the Introduction + # protocol built. At that point, it should take the 'owner' argument + # (either a FURL or a Sealer/public-key) and look it up in a + # persistent table, returning a short integer. If the owner doesn't + # yet exist in the table, create a new entry for it and return the + # new index. + return 0 def remote_renew_lease(self, storage_index, renew_secret): - # find the lease - cur = self._owner_db_cur - cur.execute("SELECT leases.lease_id FROM buckets, leases" - " WHERE buckets.storage_index = ?" - " AND buckets.bucket_id = leases.bucket_id" - " AND leases.renew_secret = ?", - (storage_index, renew_secret)) - res = cur.fetchone() - if res: - # found it, now update it. The new leases will expire one month - # from now. - expire_time = time.time() + 31*24*60*60 - lease_id = res[0] - cur.execute("UPDATE leases" - " SET expire_time = ?" - " WHERE lease_id = ?", - (expire_time, lease_id)) - else: - # no such lease - raise IndexError("No such lease") - self._owner_db_con.commit() + new_expire_time = time.time() + 31*24*60*60 + found_buckets = False + for shnum, filename in self._get_bucket_shares(storage_index): + found_buckets = True + sf = ShareFile(filename) + sf.renew_lease(renew_secret, new_expire_time) + if not found_buckets: + raise IndexError("no such lease to renew") def remote_cancel_lease(self, storage_index, cancel_secret): - # find the lease - cur = self._owner_db_cur - cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id" - " FROM buckets b, leases l" - " WHERE b.storage_index = ?" - " AND b.bucket_id = l.bucket_id" - " AND l.cancel_secret = ?", - (storage_index, cancel_secret)) - res = cur.fetchone() - if res: - # found it - lease_id, storage_index, bucket_id = res - cur.execute("DELETE FROM leases WHERE lease_id = ?", - (lease_id,)) - # was that the last one? - cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?", - (bucket_id,)) - res = cur.fetchone() - remaining_leases = res[0] - if not remaining_leases: - # delete the share - cur.execute("DELETE FROM buckets WHERE bucket_id = ?", - (bucket_id,)) - self.delete_bucket(storage_index) - else: - # no such lease - raise IndexError("No such lease") - self._owner_db_con.commit() + storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index)) + + remaining_files = 0 + total_space_freed = 0 + found_buckets = False + for shnum, filename in self._get_bucket_shares(storage_index): + # note: if we can't find a lease on one share, we won't bother + # looking in the others. Unless something broke internally + # (perhaps we ran out of disk space while adding a lease), the + # leases on all shares will be identical. + found_buckets = True + sf = ShareFile(filename) + # this raises IndexError if the lease wasn't present + remaining_leases, space_freed = sf.cancel_lease(cancel_secret) + total_space_freed += space_freed + if remaining_leases: + remaining_files += 1 + else: + # now remove the sharefile. We'll almost certainly be + # removing the entire directory soon. + filelen = os.stat(filename)[stat.ST_SIZE] + os.unlink(filename) + total_space_freed += filelen + if not remaining_files: + os.rmdir(storagedir) + self.consumed -= total_space_freed + if not found_buckets: + raise IndexError("no such lease to cancel") def delete_bucket(self, storage_index): storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index)) @@ -277,20 +343,44 @@ class StorageServer(service.MultiService, Referenceable): self.consumed += consumed_size del self._active_writers[bw] - def remote_get_buckets(self, storage_index): - bucketreaders = {} # k: sharenum, v: BucketReader + def _get_bucket_shares(self, storage_index): + """Return a list of (shnum, pathname) tuples for files that hold + shares for this storage_index. In each tuple, 'shnum' will always be + the integer form of the last component of 'pathname'.""" storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index)) try: for f in os.listdir(storagedir): if NUM_RE.match(f): - br = BucketReader(os.path.join(storagedir, f)) - bucketreaders[int(f)] = br + filename = os.path.join(storagedir, f) + yield (int(f), filename) except OSError: # Commonly caused by there being no buckets at all. pass + def remote_get_buckets(self, storage_index): + bucketreaders = {} # k: sharenum, v: BucketReader + for shnum, filename in self._get_bucket_shares(storage_index): + bucketreaders[shnum] = BucketReader(filename) return bucketreaders + def get_leases(self, storage_index): + """Provide an iterator that yields all of the leases attached to this + bucket. Each lease is returned as a tuple of (owner_num, + renew_secret, cancel_secret, expiration_time). + + This method is not for client use. + """ + + # since all shares get the same lease data, we just grab the leases + # from the first share + try: + shnum, filename = self._get_bucket_shares(storage_index).next() + sf = ShareFile(filename) + return sf.iter_leases() + except StopIteration: + return iter([]) + + """ Share data is written into a single file. At the start of the file, there is a series of four-byte big-endian offset values, which indicate where each diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index e25f7333..61cd2804 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -4,7 +4,7 @@ from twisted.trial import unittest from twisted.application import service from twisted.internet import defer from foolscap import Referenceable -import os.path +import time, os.path, stat import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil @@ -23,9 +23,16 @@ class Bucket(unittest.TestCase): def bucket_writer_closed(self, bw, consumed): pass + def make_lease(self): + owner_num = 0 + renew_secret = os.urandom(32) + cancel_secret = os.urandom(32) + expiration_time = time.time() + 5000 + return (owner_num, renew_secret, cancel_secret, expiration_time) + def test_create(self): incoming, final = self.make_workdir("test_create") - bw = BucketWriter(self, incoming, final, 200) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw.remote_write(0, "a"*25) bw.remote_write(25, "b"*25) bw.remote_write(50, "c"*25) @@ -34,7 +41,7 @@ class Bucket(unittest.TestCase): def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") - bw = BucketWriter(self, incoming, final, 200) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw.remote_write(0, "a"*25) bw.remote_write(25, "b"*25) bw.remote_write(50, "c"*7) # last block may be short @@ -61,11 +68,18 @@ class BucketProxy(unittest.TestCase): final = os.path.join(basedir, "bucket") fileutil.make_dirs(basedir) fileutil.make_dirs(os.path.join(basedir, "tmp")) - bw = BucketWriter(self, incoming, final, size) + bw = BucketWriter(self, incoming, final, size, self.make_lease()) rb = RemoteBucket() rb.target = bw return bw, rb, final + def make_lease(self): + owner_num = 0 + renew_secret = os.urandom(32) + cancel_secret = os.urandom(32) + expiration_time = time.time() + 5000 + return (owner_num, renew_secret, cancel_secret, expiration_time) + def bucket_writer_closed(self, bw, consumed): pass @@ -225,16 +239,21 @@ class Server(unittest.TestCase): self.failUnlessEqual(set(writers.keys()), set([5])) def test_sizelimits(self): - ss = self.create("test_sizelimits", 100) + ss = self.create("test_sizelimits", 5000) canary = Referenceable() - - already,writers = self.allocate(ss, "vid1", [0,1,2], 25) + # a newly created and filled share incurs this much overhead, beyond + # the size we request. + OVERHEAD = 3*4 + LEASE_SIZE = 4+32+32+4 + + already,writers = self.allocate(ss, "vid1", [0,1,2], 1000) self.failUnlessEqual(len(writers), 3) - # now the StorageServer should have 75 bytes provisionally allocated, - # allowing only 25 more to be claimed + # now the StorageServer should have 3000 bytes provisionally + # allocated, allowing only 2000 more to be claimed self.failUnlessEqual(len(ss._active_writers), 3) - already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 25) + # allocating 1001-byte shares only leaves room for one + already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001) self.failUnlessEqual(len(writers2), 1) self.failUnlessEqual(len(ss._active_writers), 4) @@ -243,9 +262,11 @@ class Server(unittest.TestCase): del already del writers self.failUnlessEqual(len(ss._active_writers), 1) + # now we have a provisional allocation of 1001 bytes # and we close the second set, so their provisional allocation should - # become real, long-term allocation + # become real, long-term allocation, and grows to include the + # overhead. for bw in writers2.values(): bw.remote_write(0, "a"*25) bw.remote_close() @@ -254,10 +275,12 @@ class Server(unittest.TestCase): del bw self.failUnlessEqual(len(ss._active_writers), 0) - # now there should be 25 bytes allocated, and 75 free - already3,writers3 = self.allocate(ss,"vid3", [0,1,2,3], 25) - self.failUnlessEqual(len(writers3), 3) - self.failUnlessEqual(len(ss._active_writers), 3) + allocated = 1001 + OVERHEAD + LEASE_SIZE + # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and + # 5000-1085=3915 free, therefore we can fit 39 100byte shares + already3,writers3 = self.allocate(ss,"vid3", range(100), 100) + self.failUnlessEqual(len(writers3), 39) + self.failUnlessEqual(len(ss._active_writers), 39) del already3 del writers3 @@ -266,13 +289,37 @@ class Server(unittest.TestCase): del ss # creating a new StorageServer in the same directory should see the - # same usage. note that metadata will be counted at startup but not - # during runtime, so if we were creating any metadata, the allocation - # would be more than 25 bytes and this test would need to be changed. - ss = self.create("test_sizelimits", 100) - already4,writers4 = self.allocate(ss, "vid4", [0,1,2,3], 25) - self.failUnlessEqual(len(writers4), 3) - self.failUnlessEqual(len(ss._active_writers), 3) + # same usage. + + # metadata that goes into the share file is counted upon share close, + # as well as at startup. metadata that goes into other files will not + # be counted until the next startup, so if we were creating any + # extra-file metadata, the allocation would be more than 'allocated' + # and this test would need to be changed. + ss = self.create("test_sizelimits", 5000) + already4,writers4 = self.allocate(ss, "vid4", range(100), 100) + self.failUnlessEqual(len(writers4), 39) + self.failUnlessEqual(len(ss._active_writers), 39) + + def test_seek(self): + basedir = self.workdir("test_seek_behavior") + fileutil.make_dirs(basedir) + filename = os.path.join(basedir, "testfile") + f = open(filename, "wb") + f.write("start") + f.close() + # mode="w" allows seeking-to-create-holes, but truncates pre-existing + # files. mode="a" preserves previous contents but does not allow + # seeking-to-create-holes. mode="r+" allows both. + f = open(filename, "rb+") + f.seek(100) + f.write("100") + f.close() + filelen = os.stat(filename)[stat.ST_SIZE] + self.failUnlessEqual(filelen, 100+3) + f2 = open(filename, "rb") + self.failUnlessEqual(f2.read(5), "start") + def test_leases(self): ss = self.create("test_leases") @@ -289,6 +336,10 @@ class Server(unittest.TestCase): for wb in writers.values(): wb.remote_close() + leases = list(ss.get_leases("si0")) + self.failUnlessEqual(len(leases), 1) + self.failUnlessEqual(set([l[1] for l in leases]), set([rs0])) + rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()), hashutil.tagged_hash("blah", "%d" % self._secret.next())) already,writers = ss.remote_allocate_buckets("si1", rs1, cs1, @@ -304,6 +355,10 @@ class Server(unittest.TestCase): self.failUnlessEqual(len(already), 5) self.failUnlessEqual(len(writers), 0) + leases = list(ss.get_leases("si1")) + self.failUnlessEqual(len(leases), 2) + self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2])) + # check that si0 is readable readers = ss.remote_get_buckets("si0") self.failUnlessEqual(len(readers), 5) @@ -336,6 +391,10 @@ class Server(unittest.TestCase): # the corresponding renew should no longer work self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1) + leases = list(ss.get_leases("si1")) + self.failUnlessEqual(len(leases), 1) + self.failUnlessEqual(set([l[1] for l in leases]), set([rs2])) + ss.remote_renew_lease("si1", rs2) # cancelling the second should make it go away ss.remote_cancel_lease("si1", cs2) @@ -344,3 +403,6 @@ class Server(unittest.TestCase): self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1) self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2) + leases = list(ss.get_leases("si1")) + self.failUnlessEqual(len(leases), 0) +