]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
test_storage.py: improve test coverage
authorBrian Warner <warner@allmydata.com>
Wed, 18 Jun 2008 00:01:42 +0000 (17:01 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 18 Jun 2008 00:01:42 +0000 (17:01 -0700)
src/allmydata/test/test_storage.py

index 8d92db989a41ed530562ecc33f3042268304889f..c517f5e20b3d18de823696f40818f0d132a2d814 100644 (file)
@@ -8,14 +8,31 @@ from allmydata import interfaces
 from allmydata.util import fileutil, hashutil
 from allmydata.storage import BucketWriter, BucketReader, \
      WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
-     storage_index_to_dir
+     storage_index_to_dir, DataTooLargeError
 from allmydata.interfaces import BadWriteEnablerError
 from allmydata.test.common import LoggingServiceParent
 
+class Marker:
+    pass
 class FakeCanary:
-    def notifyOnDisconnect(self, *args, **kwargs):
-        pass
+    def __init__(self, ignore_disconnectors=False):
+        self.ignore = ignore_disconnectors
+        self.disconnectors = {}
+    def notifyOnDisconnect(self, f, *args, **kwargs):
+        if self.ignore:
+            return
+        m = Marker()
+        self.disconnectors[m] = (f, args, kwargs)
+        return m
     def dontNotifyOnDisconnect(self, marker):
+        if self.ignore:
+            return
+        del self.disconnectors[marker]
+
+class FakeStatsProvider:
+    def count(self, name, delta=1):
+        pass
+    def register_producer(self, producer):
         pass
 
 class Bucket(unittest.TestCase):
@@ -158,10 +175,12 @@ class BucketProxy(unittest.TestCase):
             br = BucketReader(self, sharefname)
             rb = RemoteBucket()
             rb.target = br
-            rbp = ReadBucketProxy(rb)
+            rbp = ReadBucketProxy(rb, peerid="abc")
+            self.failUnless("to peer" in repr(rbp))
             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
 
             d1 = rbp.startIfNecessary()
+            d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
             d1.addCallback(lambda res: rbp.get_block(0))
             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
             d1.addCallback(lambda res: rbp.get_block(1))
@@ -207,19 +226,22 @@ class Server(unittest.TestCase):
 
     def create(self, name, sizelimit=None):
         workdir = self.workdir(name)
-        ss = StorageServer(workdir, sizelimit)
+        ss = StorageServer(workdir, sizelimit,
+                           stats_provider=FakeStatsProvider())
         ss.setServiceParent(self.sparent)
         return ss
 
     def test_create(self):
         ss = self.create("test_create")
 
-    def allocate(self, ss, storage_index, sharenums, size):
+    def allocate(self, ss, storage_index, sharenums, size, canary=None):
         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
+        if not canary:
+            canary = FakeCanary()
         return ss.remote_allocate_buckets(storage_index,
                                           renew_secret, cancel_secret,
-                                          sharenums, size, FakeCanary())
+                                          sharenums, size, canary)
 
     def test_dont_overfill_dirs(self):
         """
@@ -259,56 +281,86 @@ class Server(unittest.TestCase):
     def test_allocate(self):
         ss = self.create("test_allocate")
 
-        self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
+        self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
 
         canary = FakeCanary()
-        already,writers = self.allocate(ss, "vid", [0,1,2], 75)
+        already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
         self.failUnlessEqual(already, set())
         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
 
         # while the buckets are open, they should not count as readable
-        self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
+        self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
 
         # close the buckets
         for i,wb in writers.items():
             wb.remote_write(0, "%25d" % i)
             wb.remote_close()
+            # aborting a bucket that was already closed is a no-op
+            wb.remote_abort()
 
         # now they should be readable
-        b = ss.remote_get_buckets("vid")
+        b = ss.remote_get_buckets("allocate")
         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
 
-        # now if we about writing again, the server should offer those three
-        # buckets as already present. It should offer them even if we don't
-        # ask about those specific ones.
-        already,writers = self.allocate(ss, "vid", [2,3,4], 75)
+        # now if we ask about writing again, the server should offer those
+        # three buckets as already present. It should offer them even if we
+        # don't ask about those specific ones.
+        already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
         self.failUnlessEqual(already, set([0,1,2]))
         self.failUnlessEqual(set(writers.keys()), set([3,4]))
 
         # while those two buckets are open for writing, the server should
         # refuse to offer them to uploaders
 
-        already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
-        self.failUnlessEqual(already, set([0,1,2]))
-        self.failUnlessEqual(set(writers.keys()), set([5]))
+        already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
+        self.failUnlessEqual(already2, set([0,1,2]))
+        self.failUnlessEqual(set(writers2.keys()), set([5]))
+
+        # aborting the writes should remove the tempfiles
+        for i,wb in writers2.items():
+            wb.remote_abort()
+        already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
+        self.failUnlessEqual(already2, set([0,1,2]))
+        self.failUnlessEqual(set(writers2.keys()), set([5]))
+
+        for i,wb in writers2.items():
+            wb.remote_abort()
+        for i,wb in writers.items():
+            wb.remote_abort()
+
+    def test_disconnect(self):
+        # simulate a disconnection
+        ss = self.create("test_disconnect")
+        canary = FakeCanary()
+        already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
+        self.failUnlessEqual(already, set())
+        self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
+        for (f,args,kwargs) in canary.disconnectors.values():
+            f(*args, **kwargs)
+        del already
+        del writers
+
+        # that ought to delete the incoming shares
+        already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
+        self.failUnlessEqual(already, set())
+        self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
 
     def test_sizelimits(self):
         ss = self.create("test_sizelimits", 5000)
-        canary = FakeCanary()
         # a newly created and filled share incurs this much overhead, beyond
         # the size we request.
         OVERHEAD = 3*4
         LEASE_SIZE = 4+32+32+4
-
-        already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
+        canary = FakeCanary(True)
+        already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
         self.failUnlessEqual(len(writers), 3)
         # now the StorageServer should have 3000 bytes provisionally
         # allocated, allowing only 2000 more to be claimed
         self.failUnlessEqual(len(ss._active_writers), 3)
 
         # allocating 1001-byte shares only leaves room for one
-        already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
+        already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
         self.failUnlessEqual(len(writers2), 1)
         self.failUnlessEqual(len(ss._active_writers), 4)
 
@@ -333,7 +385,7 @@ class Server(unittest.TestCase):
         allocated = 1001 + OVERHEAD + LEASE_SIZE
         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
-        already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
+        already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
         self.failUnlessEqual(len(writers3), 39)
         self.failUnlessEqual(len(ss._active_writers), 39)
 
@@ -352,7 +404,7 @@ class Server(unittest.TestCase):
         # extra-file metadata, the allocation would be more than 'allocated'
         # and this test would need to be changed.
         ss = self.create("test_sizelimits", 5000)
-        already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
+        already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
         self.failUnlessEqual(len(writers4), 39)
         self.failUnlessEqual(len(ss._active_writers), 39)
 
@@ -569,10 +621,38 @@ class MutableServer(unittest.TestCase):
         self.failUnless(isinstance(readv_data, dict))
         self.failUnlessEqual(len(readv_data), 0)
 
+    def test_container_size(self):
+        ss = self.create("test_container_size")
+        self.allocate(ss, "si1", "we1", self._lease_secret.next(),
+                      set([0,1,2]), 100)
+        rstaraw = ss.remote_slot_testv_and_readv_and_writev
+        secrets = ( self.write_enabler("we1"),
+                    self.renew_secret("we1"),
+                    self.cancel_secret("we1") )
+        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+        answer = rstaraw("si1", secrets,
+                         {0: ([], [(0,data)], len(data)+12)},
+                         [])
+        self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
+
+        # trying to make the container too large will raise an exception
+        TOOBIG = MutableShareFile.MAX_SIZE + 10
+        self.failUnlessRaises(DataTooLargeError,
+                              rstaraw, "si1", secrets,
+                              {0: ([], [(0,data)], TOOBIG)},
+                              [])
+
+        # it should be possible to make the container smaller, although at
+        # the moment this doesn't actually affect the share
+        answer = rstaraw("si1", secrets,
+                         {0: ([], [(0,data)], len(data)+8)},
+                         [])
+        self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
+
     def test_allocate(self):
         ss = self.create("test_allocate")
         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
-                               set([0,1,2]), 100)
+                      set([0,1,2]), 100)
 
         read = ss.remote_slot_readv
         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
@@ -843,7 +923,7 @@ class MutableServer(unittest.TestCase):
                                    cancel_secret_b, nodeid_b) )
 
     def test_leases(self):
-        ss = self.create("test_leases")
+        ss = self.create("test_leases", sizelimit=1000*1000)
         def secrets(n):
             return ( self.write_enabler("we1"),
                      self.renew_secret("we1-%d" % n),
@@ -943,11 +1023,25 @@ class MutableServer(unittest.TestCase):
         self.failUnlessEqual(len(remaining_shares), 1)
         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
 
+        # cancelling a non-existent lease should raise an IndexError
+        self.failUnlessRaises(IndexError,
+                              ss.remote_cancel_lease, "si1", "nonsecret")
+
+        # and the slot should still be there
+        remaining_shares = read("si1", [], [(0,10)])
+        self.failUnlessEqual(len(remaining_shares), 1)
+        self.failUnlessEqual(len(s0.debug_get_leases()), 1)
+
         ss.remote_cancel_lease("si1", secrets(4)[2])
         # now the slot should be gone
         no_shares = read("si1", [], [(0,10)])
         self.failUnlessEqual(no_shares, {})
 
+        # cancelling a lease on a non-existent share should raise an IndexError
+        self.failUnlessRaises(IndexError,
+                              ss.remote_cancel_lease, "si2", "nonsecret")
+
+
 class Stats(unittest.TestCase):
 
     def setUp(self):