From: Brian Warner Date: Wed, 4 Jul 2007 00:08:02 +0000 (-0700) Subject: storageserver: implement size limits. No code to enable them yet, though X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=c80ea7d69399b1536b8cdde9473e705bcc223f31;p=tahoe-lafs%2Ftahoe-lafs.git storageserver: implement size limits. No code to enable them yet, though --- diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py index 7403ecc1..c70d113a 100644 --- a/src/allmydata/storageserver.py +++ b/src/allmydata/storageserver.py @@ -1,4 +1,4 @@ -import os, re +import os, re, weakref from foolscap import Referenceable from twisted.application import service @@ -25,15 +25,20 @@ NUM_RE=re.compile("[0-9]*") class BucketWriter(Referenceable): implements(RIBucketWriter) - def __init__(self, incominghome, finalhome, blocksize): + def __init__(self, ss, incominghome, finalhome, blocksize, sharesize): + self.ss = ss self.incominghome = incominghome self.finalhome = finalhome self.blocksize = blocksize + self.sharesize = sharesize self.closed = False self._next_segnum = 0 fileutil.make_dirs(incominghome) self._write_file('blocksize', str(blocksize)) + def allocated_size(self): + return self.sharesize + def _write_file(self, fname, data): open(os.path.join(self.incominghome, fname), 'wb').write(data) @@ -87,6 +92,7 @@ class BucketWriter(Referenceable): pass self.closed = True + self.ss.bucket_writer_closed(self) def str2l(s): """ split string (pulled from storage) into a list of blockids """ @@ -128,39 +134,70 @@ class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer) name = 'storageserver' - def __init__(self, storedir): + def __init__(self, storedir, sizelimit=None): + service.MultiService.__init__(self) fileutil.make_dirs(storedir) self.storedir = storedir + self.sizelimit = sizelimit self.incomingdir = os.path.join(storedir, 'incoming') self._clean_incomplete() fileutil.make_dirs(self.incomingdir) + self._active_writers = weakref.WeakKeyDictionary() - service.MultiService.__init__(self) + self.measure_size() def _clean_incomplete(self): fileutil.rm_dir(self.incomingdir) + def measure_size(self): + self.consumed = fileutil.du(self.storedir) + + def allocated_size(self): + space = self.consumed + for bw in self._active_writers: + space += bw.allocated_size() + return space + def remote_allocate_buckets(self, storage_index, sharenums, sharesize, blocksize, canary): alreadygot = set() bucketwriters = {} # k: shnum, v: BucketWriter + si_s = idlib.b2a(storage_index) + space_per_bucket = sharesize + no_limits = self.sizelimit is None + yes_limits = not no_limits + if yes_limits: + remaining_space = self.sizelimit - self.allocated_size() for shnum in sharenums: - incominghome = os.path.join(self.incomingdir, idlib.b2a(storage_index), "%d"%shnum) - finalhome = os.path.join(self.storedir, idlib.b2a(storage_index), "%d"%shnum) + incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum) + finalhome = os.path.join(self.storedir, si_s, "%d" % shnum) if os.path.exists(incominghome) or os.path.exists(finalhome): alreadygot.add(shnum) + elif no_limits or remaining_space >= space_per_bucket: + bw = BucketWriter(self, incominghome, finalhome, + blocksize, space_per_bucket) + bucketwriters[shnum] = bw + self._active_writers[bw] = 1 + if yes_limits: + remaining_space -= space_per_bucket else: - bucketwriters[shnum] = BucketWriter(incominghome, finalhome, blocksize) - + # not enough space to accept this bucket + pass + return alreadygot, bucketwriters + def bucket_writer_closed(self, bw): + self.consumed += bw.allocated_size() + del self._active_writers[bw] + def remote_get_buckets(self, storage_index): bucketreaders = {} # k: sharenum, v: BucketReader storagedir = os.path.join(self.storedir, idlib.b2a(storage_index)) try: for f in os.listdir(storagedir): if NUM_RE.match(f): - bucketreaders[int(f)] = BucketReader(os.path.join(storagedir, f)) + br = BucketReader(os.path.join(storagedir, f)) + bucketreaders[int(f)] = br except OSError: # Commonly caused by there being no buckets at all. pass diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index b8f9297b..4154cbaf 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -16,9 +16,12 @@ class Bucket(unittest.TestCase): fileutil.make_dirs(basedir) return incoming, final + def bucket_writer_closed(self, bw): + pass + def test_create(self): incoming, final = self.make_workdir("test_create") - bw = storageserver.BucketWriter(incoming, final, 25) + bw = storageserver.BucketWriter(self, incoming, final, 25, 57) bw.remote_put_block(0, "a"*25) bw.remote_put_block(1, "b"*25) bw.remote_put_block(2, "c"*7) # last block may be short @@ -26,7 +29,7 @@ class Bucket(unittest.TestCase): def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") - bw = storageserver.BucketWriter(incoming, final, 25) + bw = storageserver.BucketWriter(self, incoming, final, 25, 57) bw.remote_put_block(0, "a"*25) bw.remote_put_block(1, "b"*25) bw.remote_put_block(2, "c"*7) # last block may be short @@ -52,12 +55,12 @@ class Server(unittest.TestCase): return self.sparent.stopService() def workdir(self, name): - basedir = os.path.join("test_storage", "Server", name) + basedir = os.path.join("storage", "Server", name) return basedir - def create(self, name): + def create(self, name, sizelimit=None): workdir = self.workdir(name) - ss = storageserver.StorageServer(workdir) + ss = storageserver.StorageServer(workdir, sizelimit) ss.setServiceParent(self.sparent) return ss @@ -104,3 +107,48 @@ class Server(unittest.TestCase): self.failUnlessEqual(already, set([2,3,4])) self.failUnlessEqual(set(writers.keys()), set([5])) + def test_sizelimits(self): + ss = self.create("test_sizelimits", 100) + canary = Referenceable() + + already,writers = ss.remote_allocate_buckets("vid1", [0,1,2], + 25, 5, canary) + self.failUnlessEqual(len(writers), 3) + # now the StorageServer should have 75 bytes provisionally allocated, + # allowing only 25 more to be claimed + + already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2], + 25, 5, canary) + self.failUnlessEqual(len(writers2), 1) + + # we abandon the first set, so their provisional allocation should be + # returned + del already + del writers + + # and we close the second set, so their provisional allocation should + # become real, long-term allocation + for bw in writers2.values(): + bw.remote_close() + del already2 + del writers2 + del bw + + # now there should be 25 bytes allocated, and 75 free + already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3], + 25, 5, canary) + self.failUnlessEqual(len(writers3), 3) + + del already3 + del writers3 + ss.disownServiceParent() + del ss + + # creating a new StorageServer in the same directory should see the + # same usage. note that metadata will be counted at startup but not + # during runtime, so if we were creating any metadata, the allocation + # would be more than 25 bytes and this test would need to be changed. + ss = self.create("test_sizelimits", 100) + already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3], + 25, 5, canary) + self.failUnlessEqual(len(writers4), 3)