-import os, re
+import os, re, weakref
from foolscap import Referenceable
from twisted.application import service
class BucketWriter(Referenceable):
implements(RIBucketWriter)
- def __init__(self, incominghome, finalhome, blocksize):
+ def __init__(self, ss, incominghome, finalhome, blocksize, sharesize):
+ self.ss = ss
self.incominghome = incominghome
self.finalhome = finalhome
self.blocksize = blocksize
+ self.sharesize = sharesize
self.closed = False
self._next_segnum = 0
fileutil.make_dirs(incominghome)
self._write_file('blocksize', str(blocksize))
+ def allocated_size(self):
+ return self.sharesize
+
def _write_file(self, fname, data):
open(os.path.join(self.incominghome, fname), 'wb').write(data)
pass
self.closed = True
+ self.ss.bucket_writer_closed(self)
def str2l(s):
""" split string (pulled from storage) into a list of blockids """
implements(RIStorageServer)
name = 'storageserver'
- def __init__(self, storedir):
+ def __init__(self, storedir, sizelimit=None):
+ service.MultiService.__init__(self)
fileutil.make_dirs(storedir)
self.storedir = storedir
+ self.sizelimit = sizelimit
self.incomingdir = os.path.join(storedir, 'incoming')
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
+ self._active_writers = weakref.WeakKeyDictionary()
- service.MultiService.__init__(self)
+ self.measure_size()
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
+ def measure_size(self):
+ self.consumed = fileutil.du(self.storedir)
+
+ def allocated_size(self):
+ space = self.consumed
+ for bw in self._active_writers:
+ space += bw.allocated_size()
+ return space
+
def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
blocksize, canary):
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
+ si_s = idlib.b2a(storage_index)
+ space_per_bucket = sharesize
+ no_limits = self.sizelimit is None
+ yes_limits = not no_limits
+ if yes_limits:
+ remaining_space = self.sizelimit - self.allocated_size()
for shnum in sharenums:
- incominghome = os.path.join(self.incomingdir, idlib.b2a(storage_index), "%d"%shnum)
- finalhome = os.path.join(self.storedir, idlib.b2a(storage_index), "%d"%shnum)
+ incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
+ finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum)
+ elif no_limits or remaining_space >= space_per_bucket:
+ bw = BucketWriter(self, incominghome, finalhome,
+ blocksize, space_per_bucket)
+ bucketwriters[shnum] = bw
+ self._active_writers[bw] = 1
+ if yes_limits:
+ remaining_space -= space_per_bucket
else:
- bucketwriters[shnum] = BucketWriter(incominghome, finalhome, blocksize)
-
+ # not enough space to accept this bucket
+ pass
+
return alreadygot, bucketwriters
+ def bucket_writer_closed(self, bw):
+ self.consumed += bw.allocated_size()
+ del self._active_writers[bw]
+
def remote_get_buckets(self, storage_index):
bucketreaders = {} # k: sharenum, v: BucketReader
storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
try:
for f in os.listdir(storagedir):
if NUM_RE.match(f):
- bucketreaders[int(f)] = BucketReader(os.path.join(storagedir, f))
+ br = BucketReader(os.path.join(storagedir, f))
+ bucketreaders[int(f)] = br
except OSError:
# Commonly caused by there being no buckets at all.
pass
fileutil.make_dirs(basedir)
return incoming, final
+ def bucket_writer_closed(self, bw):
+ pass
+
def test_create(self):
incoming, final = self.make_workdir("test_create")
- bw = storageserver.BucketWriter(incoming, final, 25)
+ bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
bw.remote_put_block(0, "a"*25)
bw.remote_put_block(1, "b"*25)
bw.remote_put_block(2, "c"*7) # last block may be short
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
- bw = storageserver.BucketWriter(incoming, final, 25)
+ bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
bw.remote_put_block(0, "a"*25)
bw.remote_put_block(1, "b"*25)
bw.remote_put_block(2, "c"*7) # last block may be short
return self.sparent.stopService()
def workdir(self, name):
- basedir = os.path.join("test_storage", "Server", name)
+ basedir = os.path.join("storage", "Server", name)
return basedir
- def create(self, name):
+ def create(self, name, sizelimit=None):
workdir = self.workdir(name)
- ss = storageserver.StorageServer(workdir)
+ ss = storageserver.StorageServer(workdir, sizelimit)
ss.setServiceParent(self.sparent)
return ss
self.failUnlessEqual(already, set([2,3,4]))
self.failUnlessEqual(set(writers.keys()), set([5]))
+ def test_sizelimits(self):
+ ss = self.create("test_sizelimits", 100)
+ canary = Referenceable()
+
+ already,writers = ss.remote_allocate_buckets("vid1", [0,1,2],
+ 25, 5, canary)
+ self.failUnlessEqual(len(writers), 3)
+ # now the StorageServer should have 75 bytes provisionally allocated,
+ # allowing only 25 more to be claimed
+
+ already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2],
+ 25, 5, canary)
+ self.failUnlessEqual(len(writers2), 1)
+
+ # we abandon the first set, so their provisional allocation should be
+ # returned
+ del already
+ del writers
+
+ # and we close the second set, so their provisional allocation should
+ # become real, long-term allocation
+ for bw in writers2.values():
+ bw.remote_close()
+ del already2
+ del writers2
+ del bw
+
+ # now there should be 25 bytes allocated, and 75 free
+ already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3],
+ 25, 5, canary)
+ self.failUnlessEqual(len(writers3), 3)
+
+ del already3
+ del writers3
+ ss.disownServiceParent()
+ del ss
+
+ # creating a new StorageServer in the same directory should see the
+ # same usage. note that metadata will be counted at startup but not
+ # during runtime, so if we were creating any metadata, the allocation
+ # would be more than 25 bytes and this test would need to be changed.
+ ss = self.create("test_sizelimits", 100)
+ already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3],
+ 25, 5, canary)
+ self.failUnlessEqual(len(writers4), 3)