storageserver: implement size limits. No code to enable them yet, though
authorBrian Warner <warner@allmydata.com>
Wed, 4 Jul 2007 00:08:02 +0000 (17:08 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 4 Jul 2007 00:08:02 +0000 (17:08 -0700)
src/allmydata/storageserver.py
src/allmydata/test/test_storage.py

index 7403ecc1281564e8fd625e50292d9121a7c38d60..c70d113a056649b5547b561b345235e122d889cb 100644 (file)
@@ -1,4 +1,4 @@
-import os, re
+import os, re, weakref
 
 from foolscap import Referenceable
 from twisted.application import service
@@ -25,15 +25,20 @@ NUM_RE=re.compile("[0-9]*")
 class BucketWriter(Referenceable):
     implements(RIBucketWriter)
 
-    def __init__(self, incominghome, finalhome, blocksize):
+    def __init__(self, ss, incominghome, finalhome, blocksize, sharesize):
+        self.ss = ss
         self.incominghome = incominghome
         self.finalhome = finalhome
         self.blocksize = blocksize
+        self.sharesize = sharesize
         self.closed = False
         self._next_segnum = 0
         fileutil.make_dirs(incominghome)
         self._write_file('blocksize', str(blocksize))
 
+    def allocated_size(self):
+        return self.sharesize
+
     def _write_file(self, fname, data):
         open(os.path.join(self.incominghome, fname), 'wb').write(data)
 
@@ -87,6 +92,7 @@ class BucketWriter(Referenceable):
             pass
             
         self.closed = True
+        self.ss.bucket_writer_closed(self)
 
 def str2l(s):
     """ split string (pulled from storage) into a list of blockids """
@@ -128,39 +134,70 @@ class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer)
     name = 'storageserver'
 
-    def __init__(self, storedir):
+    def __init__(self, storedir, sizelimit=None):
+        service.MultiService.__init__(self)
         fileutil.make_dirs(storedir)
         self.storedir = storedir
+        self.sizelimit = sizelimit
         self.incomingdir = os.path.join(storedir, 'incoming')
         self._clean_incomplete()
         fileutil.make_dirs(self.incomingdir)
+        self._active_writers = weakref.WeakKeyDictionary()
 
-        service.MultiService.__init__(self)
+        self.measure_size()
 
     def _clean_incomplete(self):
         fileutil.rm_dir(self.incomingdir)
 
+    def measure_size(self):
+        self.consumed = fileutil.du(self.storedir)
+
+    def allocated_size(self):
+        space = self.consumed
+        for bw in self._active_writers:
+            space += bw.allocated_size()
+        return space
+
     def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
                                 blocksize, canary):
         alreadygot = set()
         bucketwriters = {} # k: shnum, v: BucketWriter
+        si_s = idlib.b2a(storage_index)
+        space_per_bucket = sharesize
+        no_limits = self.sizelimit is None
+        yes_limits = not no_limits
+        if yes_limits:
+            remaining_space = self.sizelimit - self.allocated_size()
         for shnum in sharenums:
-            incominghome = os.path.join(self.incomingdir, idlib.b2a(storage_index), "%d"%shnum)
-            finalhome = os.path.join(self.storedir, idlib.b2a(storage_index), "%d"%shnum)
+            incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
+            finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
             if os.path.exists(incominghome) or os.path.exists(finalhome):
                 alreadygot.add(shnum)
+            elif no_limits or remaining_space >= space_per_bucket:
+                bw = BucketWriter(self, incominghome, finalhome,
+                                  blocksize, space_per_bucket)
+                bucketwriters[shnum] = bw
+                self._active_writers[bw] = 1
+                if yes_limits:
+                    remaining_space -= space_per_bucket
             else:
-                bucketwriters[shnum] = BucketWriter(incominghome, finalhome, blocksize)
-            
+                # not enough space to accept this bucket
+                pass
+
         return alreadygot, bucketwriters
 
+    def bucket_writer_closed(self, bw):
+        self.consumed += bw.allocated_size()
+        del self._active_writers[bw]
+
     def remote_get_buckets(self, storage_index):
         bucketreaders = {} # k: sharenum, v: BucketReader
         storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
         try:
             for f in os.listdir(storagedir):
                 if NUM_RE.match(f):
-                    bucketreaders[int(f)] = BucketReader(os.path.join(storagedir, f))
+                    br = BucketReader(os.path.join(storagedir, f))
+                    bucketreaders[int(f)] = br
         except OSError:
             # Commonly caused by there being no buckets at all.
             pass
index b8f9297bdc4c89fbce4dd00ed00a09e406e86a3d..4154cbaf543daad9aa627e422daafe1c863d014b 100644 (file)
@@ -16,9 +16,12 @@ class Bucket(unittest.TestCase):
         fileutil.make_dirs(basedir)
         return incoming, final
 
+    def bucket_writer_closed(self, bw):
+        pass
+
     def test_create(self):
         incoming, final = self.make_workdir("test_create")
-        bw = storageserver.BucketWriter(incoming, final, 25)
+        bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
         bw.remote_put_block(0, "a"*25)
         bw.remote_put_block(1, "b"*25)
         bw.remote_put_block(2, "c"*7) # last block may be short
@@ -26,7 +29,7 @@ class Bucket(unittest.TestCase):
 
     def test_readwrite(self):
         incoming, final = self.make_workdir("test_readwrite")
-        bw = storageserver.BucketWriter(incoming, final, 25)
+        bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
         bw.remote_put_block(0, "a"*25)
         bw.remote_put_block(1, "b"*25)
         bw.remote_put_block(2, "c"*7) # last block may be short
@@ -52,12 +55,12 @@ class Server(unittest.TestCase):
         return self.sparent.stopService()
 
     def workdir(self, name):
-        basedir = os.path.join("test_storage", "Server", name)
+        basedir = os.path.join("storage", "Server", name)
         return basedir
 
-    def create(self, name):
+    def create(self, name, sizelimit=None):
         workdir = self.workdir(name)
-        ss = storageserver.StorageServer(workdir)
+        ss = storageserver.StorageServer(workdir, sizelimit)
         ss.setServiceParent(self.sparent)
         return ss
 
@@ -104,3 +107,48 @@ class Server(unittest.TestCase):
         self.failUnlessEqual(already, set([2,3,4]))
         self.failUnlessEqual(set(writers.keys()), set([5]))
 
+    def test_sizelimits(self):
+        ss = self.create("test_sizelimits", 100)
+        canary = Referenceable()
+        
+        already,writers = ss.remote_allocate_buckets("vid1", [0,1,2],
+                                                     25, 5, canary)
+        self.failUnlessEqual(len(writers), 3)
+        # now the StorageServer should have 75 bytes provisionally allocated,
+        # allowing only 25 more to be claimed
+
+        already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2],
+                                                       25, 5, canary)
+        self.failUnlessEqual(len(writers2), 1)
+
+        # we abandon the first set, so their provisional allocation should be
+        # returned
+        del already
+        del writers
+
+        # and we close the second set, so their provisional allocation should
+        # become real, long-term allocation
+        for bw in writers2.values():
+            bw.remote_close()
+        del already2
+        del writers2
+        del bw
+
+        # now there should be 25 bytes allocated, and 75 free
+        already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3],
+                                                       25, 5, canary)
+        self.failUnlessEqual(len(writers3), 3)
+
+        del already3
+        del writers3
+        ss.disownServiceParent()
+        del ss
+
+        # creating a new StorageServer in the same directory should see the
+        # same usage. note that metadata will be counted at startup but not
+        # during runtime, so if we were creating any metadata, the allocation
+        # would be more than 25 bytes and this test would need to be changed.
+        ss = self.create("test_sizelimits", 100)
+        already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3],
+                                                       25, 5, canary)
+        self.failUnlessEqual(len(writers4), 3)