sia = si_b2a(storageindex)
return os.path.join(sia[:2], sia)
+class LeaseInfo:
+ def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
+ expiration_time=None, nodeid=None):
+ self.owner_num = owner_num
+ self.renew_secret = renew_secret
+ self.cancel_secret = cancel_secret
+ self.expiration_time = expiration_time
+ if nodeid is not None:
+ assert isinstance(nodeid, str)
+ assert len(nodeid) == 20
+ self.nodeid = nodeid
+
+ def from_immutable_data(self, data):
+ (self.owner_num,
+ self.renew_secret,
+ self.cancel_secret,
+ self.expiration_time) = struct.unpack(">L32s32sL", data)
+ self.nodeid = None
+ return self
+ def to_immutable_data(self):
+ return struct.pack(">L32s32sL",
+ self.owner_num,
+ self.renew_secret, self.cancel_secret,
+ int(self.expiration_time))
+
+ def to_mutable_data(self):
+ return struct.pack(">LL32s32s20s",
+ self.owner_num,
+ int(self.expiration_time),
+ self.renew_secret, self.cancel_secret,
+ self.nodeid)
+ def from_mutable_data(self, data):
+ (self.owner_num,
+ self.expiration_time,
+ self.renew_secret, self.cancel_secret,
+ self.nodeid) = struct.unpack(">LL32s32s20s", data)
+ return self
+
+
class ShareFile:
LEASE_SIZE = struct.calcsize(">L32s32sL")
self._data_offset = 0xc
self._lease_offset = 0xc + self._size
+ def unlink(self):
+ os.unlink(self.home)
+
def read_share_data(self, offset, length):
precondition(offset >= 0)
precondition(offset+length <= self._size)
f.close()
def _write_lease_record(self, f, lease_number, lease_info):
- (owner_num, renew_secret, cancel_secret, expiration_time) = lease_info
offset = self._lease_offset + lease_number * self.LEASE_SIZE
f.seek(offset)
assert f.tell() == offset
- f.write(struct.pack(">L32s32sL",
- owner_num, renew_secret, cancel_secret,
- int(expiration_time)))
+ f.write(lease_info.to_immutable_data())
def _read_num_leases(self, f):
f.seek(0x08)
for i in range(num_leases):
data = f.read(self.LEASE_SIZE)
if data:
- yield struct.unpack(">L32s32sL", data)
+ yield LeaseInfo().from_immutable_data(data)
def add_lease(self, lease_info):
f = open(self.home, 'rb+')
f.close()
def renew_lease(self, renew_secret, new_expire_time):
- for i,(on,rs,cs,et) in enumerate(self.iter_leases()):
- if rs == renew_secret:
+ for i,lease in enumerate(self.iter_leases()):
+ if lease.renew_secret == renew_secret:
# yup. See if we need to update the owner time.
- if new_expire_time > et:
+ if new_expire_time > lease.expiration_time:
# yes
- new_lease = (on,rs,cs,new_expire_time)
+ lease.expiration_time = new_expire_time
f = open(self.home, 'rb+')
- self._write_lease_record(f, i, new_lease)
+ self._write_lease_record(f, i, lease)
f.close()
return
raise IndexError("unable to renew non-existent lease")
def add_or_renew_lease(self, lease_info):
- owner_num, renew_secret, cancel_secret, expire_time = lease_info
try:
- self.renew_lease(renew_secret, expire_time)
+ self.renew_lease(lease_info.renew_secret,
+ lease_info.expiration_time)
except IndexError:
self.add_lease(lease_info)
+
def cancel_lease(self, cancel_secret):
- """Remove a lease with the given cancel_secret. Return
- (num_remaining_leases, space_freed). Raise IndexError if there was no
- lease with the given cancel_secret."""
+ """Remove a lease with the given cancel_secret. If the last lease is
+ cancelled, the file will be removed. Return the number of bytes that
+ were freed (by truncating the list of leases, and possibly by
+ deleting the file. Raise IndexError if there was no lease with the
+ given cancel_secret.
+ """
leases = list(self.iter_leases())
num_leases = len(leases)
num_leases_removed = 0
- for i,lease_info in enumerate(leases[:]):
- (on,rs,cs,et) = lease_info
- if cs == cancel_secret:
+ for i,lease in enumerate(leases[:]):
+ if lease.cancel_secret == cancel_secret:
leases[i] = None
num_leases_removed += 1
if not num_leases_removed:
self._write_num_leases(f, len(leases))
self._truncate_leases(f, len(leases))
f.close()
- return len(leases), self.LEASE_SIZE * num_leases_removed
+ space_freed = self.LEASE_SIZE * num_leases_removed
+ if not len(leases):
+ space_freed += os.stat(self.home)[stat.ST_SIZE]
+ self.unlink()
+ return space_freed
class BucketWriter(Referenceable):
# extra leases go here, none at creation
f.close()
+ def unlink(self):
+ os.unlink(self.home)
+
def _read_data_length(self, f):
f.seek(self.DATA_LENGTH_OFFSET)
(data_length,) = struct.unpack(">Q", f.read(8))
return
def _write_lease_record(self, f, lease_number, lease_info):
- (ownerid, expiration_time,
- renew_secret, cancel_secret, nodeid) = lease_info
extra_lease_offset = self._read_extra_lease_offset(f)
num_extra_leases = self._read_num_extra_leases(f)
if lease_number < 4:
+ (lease_number-4)*self.LEASE_SIZE)
f.seek(offset)
assert f.tell() == offset
- f.write(struct.pack(">LL32s32s20s",
- ownerid, int(expiration_time),
- renew_secret, cancel_secret, nodeid))
+ f.write(lease_info.to_mutable_data())
def _read_lease_record(self, f, lease_number):
- # returns a 5-tuple of lease info, or None
+ # returns a LeaseInfo instance, or None
extra_lease_offset = self._read_extra_lease_offset(f)
num_extra_leases = self._read_num_extra_leases(f)
if lease_number < 4:
f.seek(offset)
assert f.tell() == offset
data = f.read(self.LEASE_SIZE)
- lease_info = struct.unpack(">LL32s32s20s", data)
- (ownerid, expiration_time,
- renew_secret, cancel_secret, nodeid) = lease_info
- if ownerid == 0:
+ lease_info = LeaseInfo().from_mutable_data(data)
+ if lease_info.owner_num == 0:
return None
return lease_info
def renew_lease(self, renew_secret, new_expire_time):
accepting_nodeids = set()
f = open(self.home, 'rb+')
- for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
- if rs == renew_secret:
+ for (leasenum,lease) in self._enumerate_leases(f):
+ if lease.renew_secret == renew_secret:
# yup. See if we need to update the owner time.
- if new_expire_time > et:
+ if new_expire_time > lease.expiration_time:
# yes
- new_lease = (oid,new_expire_time,rs,cs,anid)
- self._write_lease_record(f, leasenum, new_lease)
+ lease.expiration_time = new_expire_time
+ self._write_lease_record(f, leasenum, lease)
f.close()
return
- accepting_nodeids.add(anid)
+ accepting_nodeids.add(lease.nodeid)
f.close()
# Return the accepting_nodeids set, to give the client a chance to
# update the leases on a share which has been migrated from its
raise IndexError(msg)
def add_or_renew_lease(self, lease_info):
- ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
try:
- self.renew_lease(renew_secret, expire_time)
+ self.renew_lease(lease_info.renew_secret,
+ lease_info.expiration_time)
except IndexError:
self.add_lease(lease_info)
def cancel_lease(self, cancel_secret):
- """Remove any leases with the given cancel_secret. Return
- (num_remaining_leases, space_freed). Raise IndexError if there was no
- lease with the given cancel_secret."""
+ """Remove any leases with the given cancel_secret. If the last lease
+ is cancelled, the file will be removed. Return the number of bytes
+ that were freed (by truncating the list of leases, and possibly by
+ deleting the file. Raise IndexError if there was no lease with the
+ given cancel_secret."""
accepting_nodeids = set()
modified = 0
remaining = 0
- blank_lease = (0, 0, "\x00"*32, "\x00"*32, "\x00"*20)
+ blank_lease = LeaseInfo(owner_num=0,
+ renew_secret="\x00"*32,
+ cancel_secret="\x00"*32,
+ expiration_time=0,
+ nodeid="\x00"*20)
f = open(self.home, 'rb+')
- for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
- accepting_nodeids.add(anid)
- if cs == cancel_secret:
+ for (leasenum,lease) in self._enumerate_leases(f):
+ accepting_nodeids.add(lease.nodeid)
+ if lease.cancel_secret == cancel_secret:
self._write_lease_record(f, leasenum, blank_lease)
modified += 1
else:
if modified:
freed_space = self._pack_leases(f)
f.close()
- return (remaining, freed_space)
+ if not remaining:
+ freed_space += os.stat(self.home)[stat.ST_SIZE]
+ self.unlink()
+ return freed_space
+
msg = ("Unable to cancel non-existent lease. I have leases "
"accepted by nodeids: ")
msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
(idlib.nodeid_b2a(write_enabler_nodeid),)
raise BadWriteEnablerError(msg)
+ def update_write_enabler(self, old_write_enabler, new_write_enabler,
+ my_nodeid, si_s):
+ self.check_write_enabler(old_write_enabler, si_s)
+ f = open(self.home, 'rb+')
+ f.seek(0)
+ header = struct.pack(">32s20s32s",
+ self.MAGIC, my_nodeid, new_write_enabler)
+ f.write(header)
+ f.close()
+
def check_testv(self, testv):
test_good = True
f = open(self.home, 'rb+')
# separate database. Note that the lease should not be added until
# the BucketWrite has been closed.
expire_time = time.time() + 31*24*60*60
- lease_info = (owner_num, renew_secret, cancel_secret, expire_time)
+ lease_info = LeaseInfo(owner_num,
+ renew_secret, cancel_secret,
+ expire_time, self.my_nodeid)
space_per_bucket = allocated_size
no_limits = self.sizelimit is None
self.add_latency("allocate", time.time() - start)
return alreadygot, bucketwriters
- def remote_renew_lease(self, storage_index, renew_secret):
- start = time.time()
- self.count("renew")
- new_expire_time = time.time() + 31*24*60*60
- found_buckets = False
+ def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index):
- found_buckets = True
f = open(filename, 'rb')
header = f.read(32)
f.close()
elif header[:4] == struct.pack(">L", 1):
sf = ShareFile(filename)
else:
- pass # non-sharefile
+ continue # non-sharefile
+ yield sf
+
+ def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
+ owner_num=0):
+ start = time.time()
+ self.count("add-lease")
+ new_expire_time = time.time() + 31*24*60*60
+ lease_info = LeaseInfo(owner_num,
+ renew_secret, cancel_secret,
+ new_expire_time, self.my_nodeid)
+ found_buckets = False
+ for sf in self._iter_share_files(storage_index):
+ found_buckets = True
+ # note: if the share has been migrated, the renew_lease()
+ # call will throw an exception, with information to help the
+ # client update the lease.
+ sf.add_or_renew_lease(lease_info)
+ self.add_latency("add-lease", time.time() - start)
+ if not found_buckets:
+ raise IndexError("no such storage index to do add-lease")
+
+
+ def remote_renew_lease(self, storage_index, renew_secret):
+ start = time.time()
+ self.count("renew")
+ new_expire_time = time.time() + 31*24*60*60
+ found_buckets = False
+ for sf in self._iter_share_files(storage_index):
+ found_buckets = True
sf.renew_lease(renew_secret, new_expire_time)
self.add_latency("renew", time.time() - start)
if not found_buckets:
def remote_cancel_lease(self, storage_index, cancel_secret):
start = time.time()
self.count("cancel")
- storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
- remaining_files = 0
total_space_freed = 0
found_buckets = False
- for shnum, filename in self._get_bucket_shares(storage_index):
+ for sf in self._iter_share_files(storage_index):
# note: if we can't find a lease on one share, we won't bother
# looking in the others. Unless something broke internally
# (perhaps we ran out of disk space while adding a lease), the
# leases on all shares will be identical.
found_buckets = True
- f = open(filename, 'rb')
- header = f.read(32)
- f.close()
- if header[:32] == MutableShareFile.MAGIC:
- sf = MutableShareFile(filename, self)
- # note: if the share has been migrated, the renew_lease()
- # call will throw an exception, with information to help the
- # client update the lease.
- elif header[:4] == struct.pack(">L", 1):
- sf = ShareFile(filename)
- else:
- pass # non-sharefile
- # this raises IndexError if the lease wasn't present
- remaining_leases, space_freed = sf.cancel_lease(cancel_secret)
- total_space_freed += space_freed
- if remaining_leases:
- remaining_files += 1
- else:
- # now remove the sharefile. We'll almost certainly be
- # removing the entire directory soon.
- filelen = os.stat(filename)[stat.ST_SIZE]
- os.unlink(filename)
- total_space_freed += filelen
- if not remaining_files:
- fileutil.rm_dir(storagedir)
+ # this raises IndexError if the lease wasn't present XXXX
+ total_space_freed += sf.cancel_lease(cancel_secret)
+
+ if found_buckets:
+ storagedir = os.path.join(self.sharedir,
+ storage_index_to_dir(storage_index))
+ if not os.listdir(storagedir):
+ os.rmdir(storagedir)
+
if self.consumed is not None:
self.consumed -= total_space_freed
if self.stats_provider:
- self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
+ self.stats_provider.count('storage_server.bytes_freed',
+ total_space_freed)
self.add_latency("cancel", time.time() - start)
if not found_buckets:
- raise IndexError("no such lease to cancel")
+ raise IndexError("no such storage index")
def bucket_writer_closed(self, bw, consumed_size):
if self.consumed is not None:
# and update the leases on all shares
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
- my_nodeid = self.my_nodeid
- anid = my_nodeid
- lease_info = (ownerid, expire_time, renew_secret, cancel_secret,
- anid)
+ lease_info = LeaseInfo(ownerid,
+ renew_secret, cancel_secret,
+ expire_time, self.my_nodeid)
for share in shares.values():
share.add_or_renew_lease(lease_info)
self.add_latency("readv", time.time() - start)
return datavs
+ def remote_update_write_enabler(self, storage_index,
+ old_write_enabler, new_write_enabler):
+ si_s = si_b2a(storage_index)
+ for sf in self._iter_share_files(storage_index):
+ if not isinstance(sf, MutableShareFile):
+ continue
+ sf.update_write_enabler(old_write_enabler, new_write_enabler,
+ self.my_nodeid, si_s)
# the code before here runs on the storage server, not the client
from allmydata.util import fileutil, hashutil
from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
- storage_index_to_dir, DataTooLargeError
+ storage_index_to_dir, DataTooLargeError, LeaseInfo
from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent
renew_secret = os.urandom(32)
cancel_secret = os.urandom(32)
expiration_time = time.time() + 5000
- return (owner_num, renew_secret, cancel_secret, expiration_time)
+ return LeaseInfo(owner_num, renew_secret, cancel_secret,
+ expiration_time, "\x00" * 20)
def test_create(self):
incoming, final = self.make_workdir("test_create")
renew_secret = os.urandom(32)
cancel_secret = os.urandom(32)
expiration_time = time.time() + 5000
- return (owner_num, renew_secret, cancel_secret, expiration_time)
+ return LeaseInfo(owner_num, renew_secret, cancel_secret,
+ expiration_time, "\x00" * 20)
def bucket_writer_closed(self, bw, consumed):
pass
workdir = self.workdir(name)
ss = StorageServer(workdir, sizelimit,
stats_provider=FakeStatsProvider())
+ ss.setNodeID("\x00" * 20)
ss.setServiceParent(self.sparent)
return ss
leases = list(ss.get_leases("si0"))
self.failUnlessEqual(len(leases), 1)
- self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
+ self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
leases = list(ss.get_leases("si1"))
self.failUnlessEqual(len(leases), 2)
- self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
+ self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
# check that si0 is readable
readers = ss.remote_get_buckets("si0")
leases = list(ss.get_leases("si1"))
self.failUnlessEqual(len(leases), 1)
- self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
+ self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
ss.remote_renew_lease("si1", rs2)
# cancelling the second should make it go away
def test_readonly(self):
workdir = self.workdir("test_readonly")
ss = StorageServer(workdir, readonly_storage=True)
+ ss.setNodeID("\x00" * 20)
ss.setServiceParent(self.sparent)
canary = FakeCanary()
# discard is really only used for other tests, but we test it anyways
workdir = self.workdir("test_discard")
ss = StorageServer(workdir, discard_storage=True)
+ ss.setNodeID("\x00" * 20)
ss.setServiceParent(self.sparent)
canary = FakeCanary()
workdir = self.workdir(name)
ss = StorageServer(workdir, sizelimit)
ss.setServiceParent(self.sparent)
- ss.setNodeID("\x00" * 32)
+ ss.setNodeID("\x00" * 20)
return ss
def test_create(self):
1: ["1"*10],
2: ["2"*10]})
- def compare_leases_without_timestamps(self, a, b):
- self.failUnlessEqual(len(a), len(b))
- for i in range(len(a)):
- (num_a, (ownerid_a, expiration_time_a,
- renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
- (num_b, (ownerid_b, expiration_time_b,
- renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
- self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
- cancel_secret_a, nodeid_a),
- (num_b, ownerid_b, renew_secret_b,
- cancel_secret_b, nodeid_b) )
+ def compare_leases_without_timestamps(self, leases_a, leases_b):
+ self.failUnlessEqual(len(leases_a), len(leases_b))
+ for i in range(len(leases_a)):
+ num_a, a = leases_a[i]
+ num_b, b = leases_b[i]
+ self.failUnlessEqual(num_a, num_b)
+ self.failUnlessEqual(a.owner_num, b.owner_num)
+ self.failUnlessEqual(a.renew_secret, b.renew_secret)
+ self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
+ self.failUnlessEqual(a.nodeid, b.nodeid)
+
+ def compare_leases(self, leases_a, leases_b):
+ self.failUnlessEqual(len(leases_a), len(leases_b))
+ for i in range(len(leases_a)):
+ num_a, a = leases_a[i]
+ num_b, b = leases_b[i]
+ self.failUnlessEqual(num_a, num_b)
+ self.failUnlessEqual(a.owner_num, b.owner_num)
+ self.failUnlessEqual(a.renew_secret, b.renew_secret)
+ self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
+ self.failUnlessEqual(a.nodeid, b.nodeid)
+ self.failUnlessEqual(a.expiration_time, b.expiration_time)
def test_leases(self):
ss = self.create("test_leases", sizelimit=1000*1000)
all_leases = s0.debug_get_leases()
# renewing with a bogus token should prompt an error message
- # TODO: examine the exception thus raised, make sure the old nodeid
- # is present, to provide for share migration
- self.failUnlessRaises(IndexError,
- ss.remote_renew_lease, "si1",
- secrets(20)[1])
+ # examine the exception thus raised, make sure the old nodeid is
+ # present, to provide for share migration
+ e = self.failUnlessRaises(IndexError,
+ ss.remote_renew_lease, "si1",
+ secrets(20)[1])
+ e_s = str(e)
+ self.failUnless("Unable to renew non-existent lease" in e_s)
+ self.failUnless("I have leases accepted by nodeids:" in e_s)
+ self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
+
# same for cancelling
self.failUnlessRaises(IndexError,
ss.remote_cancel_lease, "si1",
secrets(20)[2])
- self.failUnlessEqual(all_leases, s0.debug_get_leases())
+ self.compare_leases(all_leases, s0.debug_get_leases())
# reading shares should not modify the timestamp
read("si1", [], [(0,200)])
- self.failUnlessEqual(all_leases, s0.debug_get_leases())
+ self.compare_leases(all_leases, s0.debug_get_leases())
write("si1", secrets(0),
{0: ([], [(200, "make me bigger")], None)}, [])
self.failUnlessRaises(IndexError,
ss.remote_cancel_lease, "si2", "nonsecret")
+ def test_update_write_enabler(self):
+ ss = self.create("test_update_write_enabler", sizelimit=1000*1000)
+ secrets = ( self.write_enabler("we1"),
+ self.renew_secret("we1-0"),
+ self.cancel_secret("we1-0") )
+ old_write_enabler = secrets[0]
+ new_write_enabler = self.write_enabler("we2")
+ new_secrets = (new_write_enabler, secrets[1], secrets[2])
+
+ data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+ write = ss.remote_slot_testv_and_readv_and_writev
+ read = ss.remote_slot_readv
+ update_write_enabler = ss.remote_update_write_enabler
+
+ rc = write("si1", secrets, {0: ([], [(0,data)], None)}, [])
+ self.failUnlessEqual(rc, (True, {}))
+
+ rc = write("si1", secrets, {0: ([], [(1,data)], None)}, [])
+ self.failUnlessEqual(rc[0], True)
+
+ f = self.failUnlessRaises(BadWriteEnablerError,
+ write, "si1", new_secrets,
+ {}, [])
+ self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
+ ss.setNodeID("\xff" * 20)
+
+ rc = update_write_enabler("si1", old_write_enabler, new_write_enabler)
+ self.failUnlessEqual(rc, None)
+
+ f = self.failUnlessRaises(BadWriteEnablerError,
+ write, "si1", secrets,
+ {}, [])
+ self.failUnless("The write enabler was recorded by nodeid '77777777777777777777777777777777'." in f, f)
+
+ rc = write("si1", new_secrets, {0: ([], [(2,data)], None)}, [])
+ self.failUnlessEqual(rc[0], True)
+
class Stats(unittest.TestCase):
def create(self, name, sizelimit=None):
workdir = self.workdir(name)
ss = StorageServer(workdir, sizelimit)
+ ss.setNodeID("\x00" * 20)
ss.setServiceParent(self.sparent)
return ss