]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
storage: fill alreadygot= with all known shares for the given storageindex, not just...
authorBrian Warner <warner@lothar.com>
Mon, 17 Sep 2007 07:48:40 +0000 (00:48 -0700)
committerBrian Warner <warner@lothar.com>
Mon, 17 Sep 2007 07:48:40 +0000 (00:48 -0700)
src/allmydata/storage.py
src/allmydata/test/test_storage.py

index afd6e07bbe8c268f6ffad35d5a590521d16c3e1b..388f8e1bac8ee6e24655366313c8963f2c2ecc7c 100644 (file)
@@ -1,4 +1,5 @@
 import os, re, weakref, stat, struct, time
+from itertools import chain
 
 from foolscap import Referenceable
 from twisted.application import service
@@ -211,6 +212,7 @@ class BucketReader(Referenceable):
     def remote_read(self, offset, length):
         return self._share_file.read_share_data(offset, length)
 
+
 class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer)
     name = 'storageserver'
@@ -265,19 +267,25 @@ class StorageServer(service.MultiService, Referenceable):
         yes_limits = not no_limits
         if yes_limits:
             remaining_space = self.sizelimit - self.allocated_size()
+
+        # fill alreadygot with all shares that we have, not just the ones
+        # they asked about: this will save them a lot of work. Add or update
+        # leases for all of them: if they want us to hold shares for this
+        # file, they'll want us to hold leases for this file.
+        for (shnum, fn) in chain(self._get_bucket_shares(storage_index),
+                                 self._get_incoming_shares(storage_index)):
+            alreadygot.add(shnum)
+            sf = ShareFile(fn)
+            sf.add_or_renew_lease(lease_info)
+
         for shnum in sharenums:
             incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
             finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
             if os.path.exists(incominghome) or os.path.exists(finalhome):
-                alreadygot.add(shnum)
-                # add a lease for the client whose upload was pre-empted
-                if os.path.exists(incominghome):
-                    # the lease gets added to the still-in-construction share
-                    sf = ShareFile(incominghome)
-                else:
-                    sf = ShareFile(finalhome)
-                sf.add_or_renew_lease(lease_info)
+                # great! we already have it. easy.
+                pass
             elif no_limits or remaining_space >= space_per_bucket:
+                # ok! we need to create the new share file.
                 fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
                 bw = BucketWriter(self, incominghome, finalhome,
                                   space_per_bucket, lease_info)
@@ -288,7 +296,7 @@ class StorageServer(service.MultiService, Referenceable):
                 if yes_limits:
                     remaining_space -= space_per_bucket
             else:
-                # not enough space to accept this bucket
+                # bummer! not enough space to accept this bucket
                 pass
 
         if bucketwriters:
@@ -354,6 +362,16 @@ class StorageServer(service.MultiService, Referenceable):
             # Commonly caused by there being no buckets at all.
             pass
 
+    def _get_incoming_shares(self, storage_index):
+        incomingdir = os.path.join(self.incomingdir, idlib.b2a(storage_index))
+        try:
+            for f in os.listdir(incomingdir):
+                if NUM_RE.match(f):
+                    filename = os.path.join(incomingdir, f)
+                    yield (int(f), filename)
+        except OSError:
+            pass
+
     def remote_get_buckets(self, storage_index):
         bucketreaders = {} # k: sharenum, v: BucketReader
         for shnum, filename in self._get_bucket_shares(storage_index):
index 61948c7e01728cb383368b569a5ee8311cdc1c19..39e2c18a5fe79316f8767630b052f466b71fdcce 100644 (file)
@@ -236,8 +236,9 @@ class Server(unittest.TestCase):
         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
 
         # now if we about writing again, the server should offer those three
-        # buckets as already present
-        already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75)
+        # buckets as already present. It should offer them even if we don't
+        # ask about those specific ones.
+        already,writers = self.allocate(ss, "vid", [2,3,4], 75)
         self.failUnlessEqual(already, set([0,1,2]))
         self.failUnlessEqual(set(writers.keys()), set([3,4]))
 
@@ -246,7 +247,7 @@ class Server(unittest.TestCase):
         # upload into them a second time)
 
         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
-        self.failUnlessEqual(already, set([2,3,4]))
+        self.failUnlessEqual(already, set([0,1,2,3,4]))
         self.failUnlessEqual(set(writers.keys()), set([5]))
 
     def test_sizelimits(self):