From 6b55b8b0223150ec35e6cd3c10b963805619e6b3 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 17 Jun 2008 17:01:42 -0700 Subject: [PATCH] test_storage.py: improve test coverage --- src/allmydata/test/test_storage.py | 146 ++++++++++++++++++++++++----- 1 file changed, 120 insertions(+), 26 deletions(-) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 8d92db98..c517f5e2 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -8,14 +8,31 @@ from allmydata import interfaces from allmydata.util import fileutil, hashutil from allmydata.storage import BucketWriter, BucketReader, \ WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \ - storage_index_to_dir + storage_index_to_dir, DataTooLargeError from allmydata.interfaces import BadWriteEnablerError from allmydata.test.common import LoggingServiceParent +class Marker: + pass class FakeCanary: - def notifyOnDisconnect(self, *args, **kwargs): - pass + def __init__(self, ignore_disconnectors=False): + self.ignore = ignore_disconnectors + self.disconnectors = {} + def notifyOnDisconnect(self, f, *args, **kwargs): + if self.ignore: + return + m = Marker() + self.disconnectors[m] = (f, args, kwargs) + return m def dontNotifyOnDisconnect(self, marker): + if self.ignore: + return + del self.disconnectors[marker] + +class FakeStatsProvider: + def count(self, name, delta=1): + pass + def register_producer(self, producer): pass class Bucket(unittest.TestCase): @@ -158,10 +175,12 @@ class BucketProxy(unittest.TestCase): br = BucketReader(self, sharefname) rb = RemoteBucket() rb.target = br - rbp = ReadBucketProxy(rb) + rbp = ReadBucketProxy(rb, peerid="abc") + self.failUnless("to peer" in repr(rbp)) self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp)) d1 = rbp.startIfNecessary() + d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent d1.addCallback(lambda res: rbp.get_block(0)) d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25)) d1.addCallback(lambda res: rbp.get_block(1)) @@ -207,19 +226,22 @@ class Server(unittest.TestCase): def create(self, name, sizelimit=None): workdir = self.workdir(name) - ss = StorageServer(workdir, sizelimit) + ss = StorageServer(workdir, sizelimit, + stats_provider=FakeStatsProvider()) ss.setServiceParent(self.sparent) return ss def test_create(self): ss = self.create("test_create") - def allocate(self, ss, storage_index, sharenums, size): + def allocate(self, ss, storage_index, sharenums, size, canary=None): renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()) cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()) + if not canary: + canary = FakeCanary() return ss.remote_allocate_buckets(storage_index, renew_secret, cancel_secret, - sharenums, size, FakeCanary()) + sharenums, size, canary) def test_dont_overfill_dirs(self): """ @@ -259,56 +281,86 @@ class Server(unittest.TestCase): def test_allocate(self): ss = self.create("test_allocate") - self.failUnlessEqual(ss.remote_get_buckets("vid"), {}) + self.failUnlessEqual(ss.remote_get_buckets("allocate"), {}) canary = FakeCanary() - already,writers = self.allocate(ss, "vid", [0,1,2], 75) + already,writers = self.allocate(ss, "allocate", [0,1,2], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) # while the buckets are open, they should not count as readable - self.failUnlessEqual(ss.remote_get_buckets("vid"), {}) + self.failUnlessEqual(ss.remote_get_buckets("allocate"), {}) # close the buckets for i,wb in writers.items(): wb.remote_write(0, "%25d" % i) wb.remote_close() + # aborting a bucket that was already closed is a no-op + wb.remote_abort() # now they should be readable - b = ss.remote_get_buckets("vid") + b = ss.remote_get_buckets("allocate") self.failUnlessEqual(set(b.keys()), set([0,1,2])) self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0) - # now if we about writing again, the server should offer those three - # buckets as already present. It should offer them even if we don't - # ask about those specific ones. - already,writers = self.allocate(ss, "vid", [2,3,4], 75) + # now if we ask about writing again, the server should offer those + # three buckets as already present. It should offer them even if we + # don't ask about those specific ones. + already,writers = self.allocate(ss, "allocate", [2,3,4], 75) self.failUnlessEqual(already, set([0,1,2])) self.failUnlessEqual(set(writers.keys()), set([3,4])) # while those two buckets are open for writing, the server should # refuse to offer them to uploaders - already,writers = self.allocate(ss, "vid", [2,3,4,5], 75) - self.failUnlessEqual(already, set([0,1,2])) - self.failUnlessEqual(set(writers.keys()), set([5])) + already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75) + self.failUnlessEqual(already2, set([0,1,2])) + self.failUnlessEqual(set(writers2.keys()), set([5])) + + # aborting the writes should remove the tempfiles + for i,wb in writers2.items(): + wb.remote_abort() + already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75) + self.failUnlessEqual(already2, set([0,1,2])) + self.failUnlessEqual(set(writers2.keys()), set([5])) + + for i,wb in writers2.items(): + wb.remote_abort() + for i,wb in writers.items(): + wb.remote_abort() + + def test_disconnect(self): + # simulate a disconnection + ss = self.create("test_disconnect") + canary = FakeCanary() + already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary) + self.failUnlessEqual(already, set()) + self.failUnlessEqual(set(writers.keys()), set([0,1,2])) + for (f,args,kwargs) in canary.disconnectors.values(): + f(*args, **kwargs) + del already + del writers + + # that ought to delete the incoming shares + already,writers = self.allocate(ss, "disconnect", [0,1,2], 75) + self.failUnlessEqual(already, set()) + self.failUnlessEqual(set(writers.keys()), set([0,1,2])) def test_sizelimits(self): ss = self.create("test_sizelimits", 5000) - canary = FakeCanary() # a newly created and filled share incurs this much overhead, beyond # the size we request. OVERHEAD = 3*4 LEASE_SIZE = 4+32+32+4 - - already,writers = self.allocate(ss, "vid1", [0,1,2], 1000) + canary = FakeCanary(True) + already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary) self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally # allocated, allowing only 2000 more to be claimed self.failUnlessEqual(len(ss._active_writers), 3) # allocating 1001-byte shares only leaves room for one - already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001) + already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary) self.failUnlessEqual(len(writers2), 1) self.failUnlessEqual(len(ss._active_writers), 4) @@ -333,7 +385,7 @@ class Server(unittest.TestCase): allocated = 1001 + OVERHEAD + LEASE_SIZE # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and # 5000-1085=3915 free, therefore we can fit 39 100byte shares - already3,writers3 = self.allocate(ss,"vid3", range(100), 100) + already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary) self.failUnlessEqual(len(writers3), 39) self.failUnlessEqual(len(ss._active_writers), 39) @@ -352,7 +404,7 @@ class Server(unittest.TestCase): # extra-file metadata, the allocation would be more than 'allocated' # and this test would need to be changed. ss = self.create("test_sizelimits", 5000) - already4,writers4 = self.allocate(ss, "vid4", range(100), 100) + already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary) self.failUnlessEqual(len(writers4), 39) self.failUnlessEqual(len(ss._active_writers), 39) @@ -569,10 +621,38 @@ class MutableServer(unittest.TestCase): self.failUnless(isinstance(readv_data, dict)) self.failUnlessEqual(len(readv_data), 0) + def test_container_size(self): + ss = self.create("test_container_size") + self.allocate(ss, "si1", "we1", self._lease_secret.next(), + set([0,1,2]), 100) + rstaraw = ss.remote_slot_testv_and_readv_and_writev + secrets = ( self.write_enabler("we1"), + self.renew_secret("we1"), + self.cancel_secret("we1") ) + data = "".join([ ("%d" % i) * 10 for i in range(10) ]) + answer = rstaraw("si1", secrets, + {0: ([], [(0,data)], len(data)+12)}, + []) + self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) ) + + # trying to make the container too large will raise an exception + TOOBIG = MutableShareFile.MAX_SIZE + 10 + self.failUnlessRaises(DataTooLargeError, + rstaraw, "si1", secrets, + {0: ([], [(0,data)], TOOBIG)}, + []) + + # it should be possible to make the container smaller, although at + # the moment this doesn't actually affect the share + answer = rstaraw("si1", secrets, + {0: ([], [(0,data)], len(data)+8)}, + []) + self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) ) + def test_allocate(self): ss = self.create("test_allocate") self.allocate(ss, "si1", "we1", self._lease_secret.next(), - set([0,1,2]), 100) + set([0,1,2]), 100) read = ss.remote_slot_readv self.failUnlessEqual(read("si1", [0], [(0, 10)]), @@ -843,7 +923,7 @@ class MutableServer(unittest.TestCase): cancel_secret_b, nodeid_b) ) def test_leases(self): - ss = self.create("test_leases") + ss = self.create("test_leases", sizelimit=1000*1000) def secrets(n): return ( self.write_enabler("we1"), self.renew_secret("we1-%d" % n), @@ -943,11 +1023,25 @@ class MutableServer(unittest.TestCase): self.failUnlessEqual(len(remaining_shares), 1) self.failUnlessEqual(len(s0.debug_get_leases()), 1) + # cancelling a non-existent lease should raise an IndexError + self.failUnlessRaises(IndexError, + ss.remote_cancel_lease, "si1", "nonsecret") + + # and the slot should still be there + remaining_shares = read("si1", [], [(0,10)]) + self.failUnlessEqual(len(remaining_shares), 1) + self.failUnlessEqual(len(s0.debug_get_leases()), 1) + ss.remote_cancel_lease("si1", secrets(4)[2]) # now the slot should be gone no_shares = read("si1", [], [(0,10)]) self.failUnlessEqual(no_shares, {}) + # cancelling a lease on a non-existent share should raise an IndexError + self.failUnlessRaises(IndexError, + ss.remote_cancel_lease, "si2", "nonsecret") + + class Stats(unittest.TestCase): def setUp(self): -- 2.45.2