from foolscap import Referenceable
from twisted.application import service
from twisted.internet import defer
-from twisted.python import util
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
from allmydata.util import fileutil, idlib, mathutil
from allmydata.util.assertutil import precondition
-try:
- # python2.5 ships with sqlite builtin
- import sqlite3.dbapi2
- sqlite = sqlite3.dbapi2
-except ImportError:
- # for python2.4, it's installed under a different name
- import pysqlite2.dbapi2
- sqlite = pysqlite2.dbapi2
-
-# store/
-# store/owners.db
-# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
-# store/shares/$STORAGEINDEX
-# store/shares/$STORAGEINDEX/$SHARENUM
-# store/shares/$STORAGEINDEX/$SHARENUM/blocksize
-# store/shares/$STORAGEINDEX/$SHARENUM/data
-# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
-# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
+# storage/
+# storage/shares/incoming
+# incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be
+# moved to storage/shares/$STORAGEINDEX/$SHARENUM upon success
+# storage/shares/$STORAGEINDEX
+# storage/shares/$STORAGEINDEX/$SHARENUM
# $SHARENUM matches this regex:
NUM_RE=re.compile("[0-9]*")
+# each share file (in storage/shares/$SI/$SHNUM) contains lease information
+# and share data. The share data is accessed by RIBucketWriter.write and
+# RIBucketReader.read . The lease information is not accessible through these
+# interfaces.
+
+# The share file has the following layout:
+# 0x00: share file version number, four bytes, current version is 1
+# 0x04: share data length, four bytes big-endian = A
+# 0x08: number of leases, four bytes big-endian
+# 0x0c: beginning of share data (described below, at WriteBucketProxy)
+# A+0x0c = B: first lease. Lease format is:
+# B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
+# B+0x04: renew secret, 32 bytes (SHA256)
+# B+0x24: cancel secret, 32 bytes (SHA256)
+# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
+# B+0x48: next lease, or end of record
+
+class ShareFile:
+ LEASE_SIZE = struct.calcsize(">L32s32sL")
+
+ def __init__(self, filename):
+ self.home = filename
+ f = open(self.home, 'rb')
+ (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
+ assert version == 1
+ self._size = size
+ self._num_leases = num_leases
+ self._data_offset = 0xc
+ self._lease_offset = 0xc + self._size
+
+ def read_share_data(self, offset, length):
+ precondition(offset >= 0)
+ precondition(offset+length <= self._size)
+ f = open(self.home, 'rb')
+ f.seek(self._data_offset+offset)
+ return f.read(length)
+
+ def write_share_data(self, offset, data):
+ length = len(data)
+ precondition(offset >= 0)
+ precondition(offset+length <= self._size)
+ f = open(self.home, 'rb+')
+ real_offset = self._data_offset+offset
+ f.seek(real_offset)
+ assert f.tell() == real_offset
+ f.write(data)
+ f.close()
+
+ def _write_lease_record(self, f, lease_number, lease_info):
+ (owner_num, renew_secret, cancel_secret, expiration_time) = lease_info
+ offset = self._lease_offset + lease_number * self.LEASE_SIZE
+ f.seek(offset)
+ assert f.tell() == offset
+ f.write(struct.pack(">L32s32sL",
+ owner_num, renew_secret, cancel_secret,
+ expiration_time))
+
+ def _read_num_leases(self, f):
+ f.seek(0x08)
+ (num_leases,) = struct.unpack(">L", f.read(4))
+ return num_leases
+
+ def _write_num_leases(self, f, num_leases):
+ f.seek(0x08)
+ f.write(struct.pack(">L", num_leases))
+
+ def _truncate_leases(self, f, num_leases):
+ f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
+
+ def iter_leases(self):
+ """Yields (ownernum, renew_secret, cancel_secret, expiration_time)
+ for all leases."""
+ f = open(self.home, 'rb')
+ (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
+ f.seek(self._lease_offset)
+ for i in range(num_leases):
+ data = f.read(self.LEASE_SIZE)
+ if data:
+ yield struct.unpack(">L32s32sL", data)
+
+ def add_lease(self, lease_info):
+ f = open(self.home, 'rb+')
+ num_leases = self._read_num_leases(f)
+ self._write_lease_record(f, num_leases, lease_info)
+ self._write_num_leases(f, num_leases+1)
+ f.close()
+
+ def renew_lease(self, renew_secret, new_expire_time):
+ for i,(on,rs,cs,et) in enumerate(self.iter_leases()):
+ if rs == renew_secret:
+ # yup. See if we need to update the owner time.
+ if new_expire_time > et:
+ # yes
+ new_lease = (on,rs,cs,new_expire_time)
+ f = open(self.home, 'rb+')
+ self._write_lease_record(f, i, new_lease)
+ f.close()
+ return
+ raise IndexError("unable to renew non-existent lease")
+
+ def cancel_lease(self, cancel_secret):
+ """Remove a lease with the given cancel_secret. Return
+ (num_remaining_leases, space_freed). Raise IndexError if there was no
+ lease with the given cancel_secret."""
+
+ leases = list(self.iter_leases())
+ num_leases = len(leases)
+ num_leases_removed = 0
+ for i,lease_info in enumerate(leases[:]):
+ (on,rs,cs,et) = lease_info
+ if cs == cancel_secret:
+ leases[i] = None
+ num_leases_removed += 1
+ if not num_leases_removed:
+ raise IndexError("unable to find matching lease to cancel")
+ if num_leases_removed:
+ # pack and write out the remaining leases. We write these out in
+ # the same order as they were added, so that if we crash while
+ # doing this, we won't lose any non-cancelled leases.
+ leases = [l for l in leases if l] # remove the cancelled leases
+ f = open(self.home, 'rb+')
+ for i,lease in enumerate(leases):
+ self._write_lease_record(f, i, lease)
+ self._write_num_leases(f, len(leases))
+ self._truncate_leases(f, len(leases))
+ f.close()
+ return len(leases), self.LEASE_SIZE * num_leases_removed
+
+
class BucketWriter(Referenceable):
implements(RIBucketWriter)
- def __init__(self, ss, incominghome, finalhome, size):
+ def __init__(self, ss, incominghome, finalhome, size, lease_info):
self.ss = ss
self.incominghome = incominghome
self.finalhome = finalhome
self._size = size
+ self._lease_info = lease_info
self.closed = False
self.throw_out_all_data = False
- # touch the file, so later callers will see that we're working on it
- f = open(self.incominghome, 'ab')
+ # touch the file, so later callers will see that we're working on it.
+ # Also construct the metadata.
+ assert not os.path.exists(self.incominghome)
+ f = open(self.incominghome, 'wb')
+ f.write(struct.pack(">LLL", 1, size, 0))
f.close()
+ self._sharefile = ShareFile(self.incominghome)
def allocated_size(self):
return self._size
def remote_write(self, offset, data):
precondition(not self.closed)
- precondition(offset >= 0)
- precondition(offset+len(data) <= self._size)
if self.throw_out_all_data:
return
- f = open(self.incominghome, 'ab')
- f.seek(offset)
- f.write(data)
- f.close()
+ self._sharefile.write_share_data(offset, data)
def remote_close(self):
precondition(not self.closed)
+ self._sharefile.add_lease(self._lease_info)
fileutil.rename(self.incominghome, self.finalhome)
+ self._sharefile = None
self.closed = True
+
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen)
implements(RIBucketReader)
def __init__(self, home):
- self.home = home
+ self._share_file = ShareFile(home)
def remote_read(self, offset, length):
- f = open(self.home, 'rb')
- f.seek(offset)
- return f.read(length)
+ return self._share_file.read_share_data(offset, length)
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer)
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
- self.init_db()
-
self.measure_size()
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
- def init_db(self):
- # files in storedir with non-zbase32 characters in it (like ".") are
- # safe, in that they cannot be accessed or overwritten by clients
- # (whose binary storage_index values are always converted into a
- # filename with idlib.b2a)
- db_file = os.path.join(self.storedir, "owners.db")
- need_to_init_db = not os.path.exists(db_file)
- self._owner_db_con = sqlite.connect(db_file)
- self._owner_db_cur = self._owner_db_con.cursor()
- if need_to_init_db:
- setup_file = util.sibpath(__file__, "owner.sql")
- setup = open(setup_file, "r").read()
- self._owner_db_cur.executescript(setup)
-
def measure_size(self):
self.consumed = fileutil.du(self.sharedir)
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
- canary):
+ canary, owner_num=0):
+ # owner_num is not for clients to set, but rather it should be
+ # curried into the PersonalStorageServer instance that is dedicated
+ # to a particular owner.
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
si_s = idlib.b2a(storage_index)
+
+ # in this implementation, the lease information (including secrets)
+ # goes into the share files themselves. It could also be put into a
+ # separate database. Note that the lease should not be added until
+ # the BucketWrite has been closed.
+ expire_time = time.time() + 31*24*60*60
+ lease_info = (owner_num, renew_secret, cancel_secret, expire_time)
+
space_per_bucket = allocated_size
no_limits = self.sizelimit is None
yes_limits = not no_limits
finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum)
+ # add a lease
+ if os.path.exists(incominghome):
+ # TODO: add a lease to the still-in-construction share
+ pass
+ else:
+ sf = ShareFile(finalhome)
+ sf.add_lease(lease_info)
elif no_limits or remaining_space >= space_per_bucket:
fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
bw = BucketWriter(self, incominghome, finalhome,
- space_per_bucket)
+ space_per_bucket, lease_info)
if self.no_storage:
bw.throw_out_all_data = True
bucketwriters[shnum] = bw
if bucketwriters:
fileutil.make_dirs(os.path.join(self.sharedir, si_s))
- # now store the secrets somewhere. This requires a
- # variable-length-list of (renew,cancel) secret tuples per bucket.
- # Note that this does not need to be kept inside the share itself, if
- # packing efficiency is a concern. For this implementation, we use a
- # sqlite database, which puts everything in a single file.
- self.add_lease(storage_index, renew_secret, cancel_secret)
-
return alreadygot, bucketwriters
- def add_lease(self, storage_index, renew_secret, cancel_secret):
- # is the bucket already in our database?
- cur = self._owner_db_cur
- cur.execute("SELECT bucket_id FROM buckets"
- " WHERE storage_index = ?",
- (storage_index,))
- res = cur.fetchone()
- if res:
- bucket_id = res[0]
- else:
- cur.execute("INSERT INTO buckets (storage_index)"
- " values(?)", (storage_index,))
- cur.execute("SELECT bucket_id FROM buckets"
- " WHERE storage_index = ?",
- (storage_index,))
- res = cur.fetchone()
- bucket_id = res[0]
-
- # what time will this lease expire? One month from now.
- expire_time = time.time() + 31*24*60*60
-
- # now, is this lease already in our database? Since we don't have
- # owners yet, look for a match by renew_secret/cancel_secret
- cur.execute("SELECT lease_id FROM leases"
- " WHERE renew_secret = ? AND cancel_secret = ?",
- (renew_secret, cancel_secret))
- res = cur.fetchone()
- if res:
- # yes, so just update the timestamp
- lease_id = res[0]
- cur.execute("UPDATE leases"
- " SET expire_time = ?"
- " WHERE lease_id = ?",
- (expire_time, lease_id))
- else:
- # no, we need to add the lease
- cur.execute("INSERT INTO leases "
- "(bucket_id, renew_secret, cancel_secret, expire_time)"
- " values(?,?,?,?)",
- (bucket_id, renew_secret, cancel_secret, expire_time))
- self._owner_db_con.commit()
+ def get_or_add_owner(self, owner):
+ # this will be more fully implemented when we get the Introduction
+ # protocol built. At that point, it should take the 'owner' argument
+ # (either a FURL or a Sealer/public-key) and look it up in a
+ # persistent table, returning a short integer. If the owner doesn't
+ # yet exist in the table, create a new entry for it and return the
+ # new index.
+ return 0
def remote_renew_lease(self, storage_index, renew_secret):
- # find the lease
- cur = self._owner_db_cur
- cur.execute("SELECT leases.lease_id FROM buckets, leases"
- " WHERE buckets.storage_index = ?"
- " AND buckets.bucket_id = leases.bucket_id"
- " AND leases.renew_secret = ?",
- (storage_index, renew_secret))
- res = cur.fetchone()
- if res:
- # found it, now update it. The new leases will expire one month
- # from now.
- expire_time = time.time() + 31*24*60*60
- lease_id = res[0]
- cur.execute("UPDATE leases"
- " SET expire_time = ?"
- " WHERE lease_id = ?",
- (expire_time, lease_id))
- else:
- # no such lease
- raise IndexError("No such lease")
- self._owner_db_con.commit()
+ new_expire_time = time.time() + 31*24*60*60
+ found_buckets = False
+ for shnum, filename in self._get_bucket_shares(storage_index):
+ found_buckets = True
+ sf = ShareFile(filename)
+ sf.renew_lease(renew_secret, new_expire_time)
+ if not found_buckets:
+ raise IndexError("no such lease to renew")
def remote_cancel_lease(self, storage_index, cancel_secret):
- # find the lease
- cur = self._owner_db_cur
- cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
- " FROM buckets b, leases l"
- " WHERE b.storage_index = ?"
- " AND b.bucket_id = l.bucket_id"
- " AND l.cancel_secret = ?",
- (storage_index, cancel_secret))
- res = cur.fetchone()
- if res:
- # found it
- lease_id, storage_index, bucket_id = res
- cur.execute("DELETE FROM leases WHERE lease_id = ?",
- (lease_id,))
- # was that the last one?
- cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
- (bucket_id,))
- res = cur.fetchone()
- remaining_leases = res[0]
- if not remaining_leases:
- # delete the share
- cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
- (bucket_id,))
- self.delete_bucket(storage_index)
- else:
- # no such lease
- raise IndexError("No such lease")
- self._owner_db_con.commit()
+ storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
+
+ remaining_files = 0
+ total_space_freed = 0
+ found_buckets = False
+ for shnum, filename in self._get_bucket_shares(storage_index):
+ # note: if we can't find a lease on one share, we won't bother
+ # looking in the others. Unless something broke internally
+ # (perhaps we ran out of disk space while adding a lease), the
+ # leases on all shares will be identical.
+ found_buckets = True
+ sf = ShareFile(filename)
+ # this raises IndexError if the lease wasn't present
+ remaining_leases, space_freed = sf.cancel_lease(cancel_secret)
+ total_space_freed += space_freed
+ if remaining_leases:
+ remaining_files += 1
+ else:
+ # now remove the sharefile. We'll almost certainly be
+ # removing the entire directory soon.
+ filelen = os.stat(filename)[stat.ST_SIZE]
+ os.unlink(filename)
+ total_space_freed += filelen
+ if not remaining_files:
+ os.rmdir(storagedir)
+ self.consumed -= total_space_freed
+ if not found_buckets:
+ raise IndexError("no such lease to cancel")
def delete_bucket(self, storage_index):
storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
self.consumed += consumed_size
del self._active_writers[bw]
- def remote_get_buckets(self, storage_index):
- bucketreaders = {} # k: sharenum, v: BucketReader
+ def _get_bucket_shares(self, storage_index):
+ """Return a list of (shnum, pathname) tuples for files that hold
+ shares for this storage_index. In each tuple, 'shnum' will always be
+ the integer form of the last component of 'pathname'."""
storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
try:
for f in os.listdir(storagedir):
if NUM_RE.match(f):
- br = BucketReader(os.path.join(storagedir, f))
- bucketreaders[int(f)] = br
+ filename = os.path.join(storagedir, f)
+ yield (int(f), filename)
except OSError:
# Commonly caused by there being no buckets at all.
pass
+ def remote_get_buckets(self, storage_index):
+ bucketreaders = {} # k: sharenum, v: BucketReader
+ for shnum, filename in self._get_bucket_shares(storage_index):
+ bucketreaders[shnum] = BucketReader(filename)
return bucketreaders
+ def get_leases(self, storage_index):
+ """Provide an iterator that yields all of the leases attached to this
+ bucket. Each lease is returned as a tuple of (owner_num,
+ renew_secret, cancel_secret, expiration_time).
+
+ This method is not for client use.
+ """
+
+ # since all shares get the same lease data, we just grab the leases
+ # from the first share
+ try:
+ shnum, filename = self._get_bucket_shares(storage_index).next()
+ sf = ShareFile(filename)
+ return sf.iter_leases()
+ except StopIteration:
+ return iter([])
+
+
"""
Share data is written into a single file. At the start of the file, there is
a series of four-byte big-endian offset values, which indicate where each
from twisted.application import service
from twisted.internet import defer
from foolscap import Referenceable
-import os.path
+import time, os.path, stat
import itertools
from allmydata import interfaces
from allmydata.util import fileutil, hashutil
def bucket_writer_closed(self, bw, consumed):
pass
+ def make_lease(self):
+ owner_num = 0
+ renew_secret = os.urandom(32)
+ cancel_secret = os.urandom(32)
+ expiration_time = time.time() + 5000
+ return (owner_num, renew_secret, cancel_secret, expiration_time)
+
def test_create(self):
incoming, final = self.make_workdir("test_create")
- bw = BucketWriter(self, incoming, final, 200)
+ bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*25)
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
- bw = BucketWriter(self, incoming, final, 200)
+ bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*7) # last block may be short
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
- bw = BucketWriter(self, incoming, final, size)
+ bw = BucketWriter(self, incoming, final, size, self.make_lease())
rb = RemoteBucket()
rb.target = bw
return bw, rb, final
+ def make_lease(self):
+ owner_num = 0
+ renew_secret = os.urandom(32)
+ cancel_secret = os.urandom(32)
+ expiration_time = time.time() + 5000
+ return (owner_num, renew_secret, cancel_secret, expiration_time)
+
def bucket_writer_closed(self, bw, consumed):
pass
self.failUnlessEqual(set(writers.keys()), set([5]))
def test_sizelimits(self):
- ss = self.create("test_sizelimits", 100)
+ ss = self.create("test_sizelimits", 5000)
canary = Referenceable()
-
- already,writers = self.allocate(ss, "vid1", [0,1,2], 25)
+ # a newly created and filled share incurs this much overhead, beyond
+ # the size we request.
+ OVERHEAD = 3*4
+ LEASE_SIZE = 4+32+32+4
+
+ already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
self.failUnlessEqual(len(writers), 3)
- # now the StorageServer should have 75 bytes provisionally allocated,
- # allowing only 25 more to be claimed
+ # now the StorageServer should have 3000 bytes provisionally
+ # allocated, allowing only 2000 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3)
- already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 25)
+ # allocating 1001-byte shares only leaves room for one
+ already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4)
del already
del writers
self.failUnlessEqual(len(ss._active_writers), 1)
+ # now we have a provisional allocation of 1001 bytes
# and we close the second set, so their provisional allocation should
- # become real, long-term allocation
+ # become real, long-term allocation, and grows to include the
+ # overhead.
for bw in writers2.values():
bw.remote_write(0, "a"*25)
bw.remote_close()
del bw
self.failUnlessEqual(len(ss._active_writers), 0)
- # now there should be 25 bytes allocated, and 75 free
- already3,writers3 = self.allocate(ss,"vid3", [0,1,2,3], 25)
- self.failUnlessEqual(len(writers3), 3)
- self.failUnlessEqual(len(ss._active_writers), 3)
+ allocated = 1001 + OVERHEAD + LEASE_SIZE
+ # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
+ # 5000-1085=3915 free, therefore we can fit 39 100byte shares
+ already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
+ self.failUnlessEqual(len(writers3), 39)
+ self.failUnlessEqual(len(ss._active_writers), 39)
del already3
del writers3
del ss
# creating a new StorageServer in the same directory should see the
- # same usage. note that metadata will be counted at startup but not
- # during runtime, so if we were creating any metadata, the allocation
- # would be more than 25 bytes and this test would need to be changed.
- ss = self.create("test_sizelimits", 100)
- already4,writers4 = self.allocate(ss, "vid4", [0,1,2,3], 25)
- self.failUnlessEqual(len(writers4), 3)
- self.failUnlessEqual(len(ss._active_writers), 3)
+ # same usage.
+
+ # metadata that goes into the share file is counted upon share close,
+ # as well as at startup. metadata that goes into other files will not
+ # be counted until the next startup, so if we were creating any
+ # extra-file metadata, the allocation would be more than 'allocated'
+ # and this test would need to be changed.
+ ss = self.create("test_sizelimits", 5000)
+ already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
+ self.failUnlessEqual(len(writers4), 39)
+ self.failUnlessEqual(len(ss._active_writers), 39)
+
+ def test_seek(self):
+ basedir = self.workdir("test_seek_behavior")
+ fileutil.make_dirs(basedir)
+ filename = os.path.join(basedir, "testfile")
+ f = open(filename, "wb")
+ f.write("start")
+ f.close()
+ # mode="w" allows seeking-to-create-holes, but truncates pre-existing
+ # files. mode="a" preserves previous contents but does not allow
+ # seeking-to-create-holes. mode="r+" allows both.
+ f = open(filename, "rb+")
+ f.seek(100)
+ f.write("100")
+ f.close()
+ filelen = os.stat(filename)[stat.ST_SIZE]
+ self.failUnlessEqual(filelen, 100+3)
+ f2 = open(filename, "rb")
+ self.failUnlessEqual(f2.read(5), "start")
+
def test_leases(self):
ss = self.create("test_leases")
for wb in writers.values():
wb.remote_close()
+ leases = list(ss.get_leases("si0"))
+ self.failUnlessEqual(len(leases), 1)
+ self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
+
rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
hashutil.tagged_hash("blah", "%d" % self._secret.next()))
already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
self.failUnlessEqual(len(already), 5)
self.failUnlessEqual(len(writers), 0)
+ leases = list(ss.get_leases("si1"))
+ self.failUnlessEqual(len(leases), 2)
+ self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
+
# check that si0 is readable
readers = ss.remote_get_buckets("si0")
self.failUnlessEqual(len(readers), 5)
# the corresponding renew should no longer work
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
+ leases = list(ss.get_leases("si1"))
+ self.failUnlessEqual(len(leases), 1)
+ self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
+
ss.remote_renew_lease("si1", rs2)
# cancelling the second should make it go away
ss.remote_cancel_lease("si1", cs2)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
+ leases = list(ss.get_leases("si1"))
+ self.failUnlessEqual(len(leases), 0)
+