import os, re, weakref, stat, struct, time
+from itertools import chain
from foolscap import Referenceable
from twisted.application import service
def remote_read(self, offset, length):
return self._share_file.read_share_data(offset, length)
+
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer)
name = 'storageserver'
yes_limits = not no_limits
if yes_limits:
remaining_space = self.sizelimit - self.allocated_size()
+
+ # fill alreadygot with all shares that we have, not just the ones
+ # they asked about: this will save them a lot of work. Add or update
+ # leases for all of them: if they want us to hold shares for this
+ # file, they'll want us to hold leases for this file.
+ for (shnum, fn) in chain(self._get_bucket_shares(storage_index),
+ self._get_incoming_shares(storage_index)):
+ alreadygot.add(shnum)
+ sf = ShareFile(fn)
+ sf.add_or_renew_lease(lease_info)
+
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome):
- alreadygot.add(shnum)
- # add a lease for the client whose upload was pre-empted
- if os.path.exists(incominghome):
- # the lease gets added to the still-in-construction share
- sf = ShareFile(incominghome)
- else:
- sf = ShareFile(finalhome)
- sf.add_or_renew_lease(lease_info)
+ # great! we already have it. easy.
+ pass
elif no_limits or remaining_space >= space_per_bucket:
+ # ok! we need to create the new share file.
fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
bw = BucketWriter(self, incominghome, finalhome,
space_per_bucket, lease_info)
if yes_limits:
remaining_space -= space_per_bucket
else:
- # not enough space to accept this bucket
+ # bummer! not enough space to accept this bucket
pass
if bucketwriters:
# Commonly caused by there being no buckets at all.
pass
+ def _get_incoming_shares(self, storage_index):
+ incomingdir = os.path.join(self.incomingdir, idlib.b2a(storage_index))
+ try:
+ for f in os.listdir(incomingdir):
+ if NUM_RE.match(f):
+ filename = os.path.join(incomingdir, f)
+ yield (int(f), filename)
+ except OSError:
+ pass
+
def remote_get_buckets(self, storage_index):
bucketreaders = {} # k: sharenum, v: BucketReader
for shnum, filename in self._get_bucket_shares(storage_index):
self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
# now if we about writing again, the server should offer those three
- # buckets as already present
- already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75)
+ # buckets as already present. It should offer them even if we don't
+ # ask about those specific ones.
+ already,writers = self.allocate(ss, "vid", [2,3,4], 75)
self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4]))
# upload into them a second time)
already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
- self.failUnlessEqual(already, set([2,3,4]))
+ self.failUnlessEqual(already, set([0,1,2,3,4]))
self.failUnlessEqual(set(writers.keys()), set([5]))
def test_sizelimits(self):