from allmydata.util import fileutil, hashutil
from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
- storage_index_to_dir
+ storage_index_to_dir, DataTooLargeError
from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent
+class Marker:
+ pass
class FakeCanary:
- def notifyOnDisconnect(self, *args, **kwargs):
- pass
+ def __init__(self, ignore_disconnectors=False):
+ self.ignore = ignore_disconnectors
+ self.disconnectors = {}
+ def notifyOnDisconnect(self, f, *args, **kwargs):
+ if self.ignore:
+ return
+ m = Marker()
+ self.disconnectors[m] = (f, args, kwargs)
+ return m
def dontNotifyOnDisconnect(self, marker):
+ if self.ignore:
+ return
+ del self.disconnectors[marker]
+
+class FakeStatsProvider:
+ def count(self, name, delta=1):
+ pass
+ def register_producer(self, producer):
pass
class Bucket(unittest.TestCase):
br = BucketReader(self, sharefname)
rb = RemoteBucket()
rb.target = br
- rbp = ReadBucketProxy(rb)
+ rbp = ReadBucketProxy(rb, peerid="abc")
+ self.failUnless("to peer" in repr(rbp))
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
d1 = rbp.startIfNecessary()
+ d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
d1.addCallback(lambda res: rbp.get_block(0))
d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
d1.addCallback(lambda res: rbp.get_block(1))
def create(self, name, sizelimit=None):
workdir = self.workdir(name)
- ss = StorageServer(workdir, sizelimit)
+ ss = StorageServer(workdir, sizelimit,
+ stats_provider=FakeStatsProvider())
ss.setServiceParent(self.sparent)
return ss
def test_create(self):
ss = self.create("test_create")
- def allocate(self, ss, storage_index, sharenums, size):
+ def allocate(self, ss, storage_index, sharenums, size, canary=None):
renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
+ if not canary:
+ canary = FakeCanary()
return ss.remote_allocate_buckets(storage_index,
renew_secret, cancel_secret,
- sharenums, size, FakeCanary())
+ sharenums, size, canary)
def test_dont_overfill_dirs(self):
"""
def test_allocate(self):
ss = self.create("test_allocate")
- self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
+ self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
canary = FakeCanary()
- already,writers = self.allocate(ss, "vid", [0,1,2], 75)
+ already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
# while the buckets are open, they should not count as readable
- self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
+ self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
# close the buckets
for i,wb in writers.items():
wb.remote_write(0, "%25d" % i)
wb.remote_close()
+ # aborting a bucket that was already closed is a no-op
+ wb.remote_abort()
# now they should be readable
- b = ss.remote_get_buckets("vid")
+ b = ss.remote_get_buckets("allocate")
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
- # now if we about writing again, the server should offer those three
- # buckets as already present. It should offer them even if we don't
- # ask about those specific ones.
- already,writers = self.allocate(ss, "vid", [2,3,4], 75)
+ # now if we ask about writing again, the server should offer those
+ # three buckets as already present. It should offer them even if we
+ # don't ask about those specific ones.
+ already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4]))
# while those two buckets are open for writing, the server should
# refuse to offer them to uploaders
- already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
- self.failUnlessEqual(already, set([0,1,2]))
- self.failUnlessEqual(set(writers.keys()), set([5]))
+ already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
+ self.failUnlessEqual(already2, set([0,1,2]))
+ self.failUnlessEqual(set(writers2.keys()), set([5]))
+
+ # aborting the writes should remove the tempfiles
+ for i,wb in writers2.items():
+ wb.remote_abort()
+ already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
+ self.failUnlessEqual(already2, set([0,1,2]))
+ self.failUnlessEqual(set(writers2.keys()), set([5]))
+
+ for i,wb in writers2.items():
+ wb.remote_abort()
+ for i,wb in writers.items():
+ wb.remote_abort()
+
+ def test_disconnect(self):
+ # simulate a disconnection
+ ss = self.create("test_disconnect")
+ canary = FakeCanary()
+ already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
+ self.failUnlessEqual(already, set())
+ self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
+ for (f,args,kwargs) in canary.disconnectors.values():
+ f(*args, **kwargs)
+ del already
+ del writers
+
+ # that ought to delete the incoming shares
+ already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
+ self.failUnlessEqual(already, set())
+ self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
def test_sizelimits(self):
ss = self.create("test_sizelimits", 5000)
- canary = FakeCanary()
# a newly created and filled share incurs this much overhead, beyond
# the size we request.
OVERHEAD = 3*4
LEASE_SIZE = 4+32+32+4
-
- already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
+ canary = FakeCanary(True)
+ already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3)
# allocating 1001-byte shares only leaves room for one
- already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
+ already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4)
allocated = 1001 + OVERHEAD + LEASE_SIZE
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
- already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
+ already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._active_writers), 39)
# extra-file metadata, the allocation would be more than 'allocated'
# and this test would need to be changed.
ss = self.create("test_sizelimits", 5000)
- already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
+ already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
self.failUnlessEqual(len(writers4), 39)
self.failUnlessEqual(len(ss._active_writers), 39)
self.failUnless(isinstance(readv_data, dict))
self.failUnlessEqual(len(readv_data), 0)
+ def test_container_size(self):
+ ss = self.create("test_container_size")
+ self.allocate(ss, "si1", "we1", self._lease_secret.next(),
+ set([0,1,2]), 100)
+ rstaraw = ss.remote_slot_testv_and_readv_and_writev
+ secrets = ( self.write_enabler("we1"),
+ self.renew_secret("we1"),
+ self.cancel_secret("we1") )
+ data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+ answer = rstaraw("si1", secrets,
+ {0: ([], [(0,data)], len(data)+12)},
+ [])
+ self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
+
+ # trying to make the container too large will raise an exception
+ TOOBIG = MutableShareFile.MAX_SIZE + 10
+ self.failUnlessRaises(DataTooLargeError,
+ rstaraw, "si1", secrets,
+ {0: ([], [(0,data)], TOOBIG)},
+ [])
+
+ # it should be possible to make the container smaller, although at
+ # the moment this doesn't actually affect the share
+ answer = rstaraw("si1", secrets,
+ {0: ([], [(0,data)], len(data)+8)},
+ [])
+ self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
+
def test_allocate(self):
ss = self.create("test_allocate")
self.allocate(ss, "si1", "we1", self._lease_secret.next(),
- set([0,1,2]), 100)
+ set([0,1,2]), 100)
read = ss.remote_slot_readv
self.failUnlessEqual(read("si1", [0], [(0, 10)]),
cancel_secret_b, nodeid_b) )
def test_leases(self):
- ss = self.create("test_leases")
+ ss = self.create("test_leases", sizelimit=1000*1000)
def secrets(n):
return ( self.write_enabler("we1"),
self.renew_secret("we1-%d" % n),
self.failUnlessEqual(len(remaining_shares), 1)
self.failUnlessEqual(len(s0.debug_get_leases()), 1)
+ # cancelling a non-existent lease should raise an IndexError
+ self.failUnlessRaises(IndexError,
+ ss.remote_cancel_lease, "si1", "nonsecret")
+
+ # and the slot should still be there
+ remaining_shares = read("si1", [], [(0,10)])
+ self.failUnlessEqual(len(remaining_shares), 1)
+ self.failUnlessEqual(len(s0.debug_get_leases()), 1)
+
ss.remote_cancel_lease("si1", secrets(4)[2])
# now the slot should be gone
no_shares = read("si1", [], [(0,10)])
self.failUnlessEqual(no_shares, {})
+ # cancelling a lease on a non-existent share should raise an IndexError
+ self.failUnlessRaises(IndexError,
+ ss.remote_cancel_lease, "si2", "nonsecret")
+
+
class Stats(unittest.TestCase):
def setUp(self):