From 814922a9a151018e06536a05a5eb3ea7d502904e Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 10 Jun 2008 11:53:10 -0700 Subject: [PATCH] storage: ignore shares in incoming/, to make clients use other servers during simultaneous uploads --- src/allmydata/storage.py | 11 ++++++++--- src/allmydata/test/test_storage.py | 16 ++++++++++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index 5687aadf..7bee77d5 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -772,8 +772,7 @@ class StorageServer(service.MultiService, Referenceable): # they asked about: this will save them a lot of work. Add or update # leases for all of them: if they want us to hold shares for this # file, they'll want us to hold leases for this file. - for (shnum, fn) in chain(self._get_bucket_shares(storage_index), - self._get_incoming_shares(storage_index)): + for (shnum, fn) in self._get_bucket_shares(storage_index): alreadygot.add(shnum) sf = ShareFile(fn) sf.add_or_renew_lease(lease_info) @@ -785,9 +784,15 @@ class StorageServer(service.MultiService, Referenceable): for shnum in sharenums: incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum) finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum) - if os.path.exists(incominghome) or os.path.exists(finalhome): + if os.path.exists(finalhome): # great! we already have it. easy. pass + elif os.path.exists(incominghome): + # Note that we don't create BucketWriters for shnums that + # have a partial share (in incoming/), so if a second upload + # occurs while the first is still in progress, the second + # uploader will use different storage servers. + pass elif no_limits or remaining_space >= space_per_bucket: # ok! we need to create the new share file. bw = BucketWriter(self, incominghome, finalhome, diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 5b389e5a..26f64a86 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -261,6 +261,7 @@ class Server(unittest.TestCase): # while the buckets are open, they should not count as readable self.failUnlessEqual(ss.remote_get_buckets("vid"), {}) + # close the buckets for i,wb in writers.items(): wb.remote_write(0, "%25d" % i) wb.remote_close() @@ -278,11 +279,10 @@ class Server(unittest.TestCase): self.failUnlessEqual(set(writers.keys()), set([3,4])) # while those two buckets are open for writing, the server should - # tell new uploaders that they already exist (so that we don't try to - # upload into them a second time) + # refuse to offer them to uploaders already,writers = self.allocate(ss, "vid", [2,3,4,5], 75) - self.failUnlessEqual(already, set([0,1,2,3,4])) + self.failUnlessEqual(already, set([0,1,2])) self.failUnlessEqual(set(writers.keys()), set([5])) def test_sizelimits(self): @@ -465,11 +465,19 @@ class Server(unittest.TestCase): self.failUnlessEqual(len(writers), 5) already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4, sharenums, size, canary) - self.failUnlessEqual(len(already2), 5) + self.failUnlessEqual(len(already2), 0) self.failUnlessEqual(len(writers2), 0) for wb in writers.values(): wb.remote_close() + leases = list(ss.get_leases("si3")) + self.failUnlessEqual(len(leases), 1) + + already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4, + sharenums, size, canary) + self.failUnlessEqual(len(already3), 5) + self.failUnlessEqual(len(writers3), 0) + leases = list(ss.get_leases("si3")) self.failUnlessEqual(len(leases), 2) -- 2.45.2