expirer: tolerate empty buckets, refactor bucketsize-counting code a bit, don't incre...
authorBrian Warner <warner@allmydata.com>
Fri, 20 Mar 2009 19:18:16 +0000 (12:18 -0700)
committerBrian Warner <warner@allmydata.com>
Fri, 20 Mar 2009 19:18:16 +0000 (12:18 -0700)
src/allmydata/storage/expirer.py
src/allmydata/test/test_storage.py

index 1c638fa2ea9cf58e48623befd12f26cfefe5e83e..bb94e11b019c391adb99c4e80226d15218b7f135 100644 (file)
@@ -114,11 +114,10 @@ class LeaseCheckingCrawler(ShareCrawler):
 
     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
         bucketdir = os.path.join(prefixdir, storage_index_b32)
-        try:
-            bucket_diskbytes = self.stat(bucketdir).st_blocks * 512
-        except AttributeError:
-            bucket_diskbytes = 0 # no stat().st_blocks on windows
+        s = self.stat(bucketdir)
         would_keep_shares = []
+        wks = None
+
         for fn in os.listdir(bucketdir):
             try:
                 shnum = int(fn)
@@ -136,29 +135,26 @@ class LeaseCheckingCrawler(ShareCrawler):
                 self.state["cycle-to-date"]["corrupt-shares"].append(which)
                 wks = (1, 1, 1, "unknown")
             would_keep_shares.append(wks)
+
         sharetype = None
         if wks:
+            # use the last share's sharetype as the buckettype
             sharetype = wks[3]
         rec = self.state["cycle-to-date"]["space-recovered"]
         self.increment(rec, "examined-buckets", 1)
         if sharetype:
             self.increment(rec, "examined-buckets-"+sharetype, 1)
 
+        try:
+            bucket_diskbytes = s.st_blocks * 512
+        except AttributeError:
+            bucket_diskbytes = 0 # no stat().st_blocks on windows
         if sum([wks[0] for wks in would_keep_shares]) == 0:
-            self.increment(rec, "original-diskbytes", bucket_diskbytes)
-            self.increment(rec, "original-diskbytes-"+sharetype, bucket_diskbytes)
-            self.increment(rec, "original-buckets", 1)
-            self.increment(rec, "original-buckets-"+sharetype, 1)
+            self.increment_bucketspace("original", bucket_diskbytes, sharetype)
         if sum([wks[1] for wks in would_keep_shares]) == 0:
-            self.increment(rec, "configured-diskbytes", bucket_diskbytes)
-            self.increment(rec, "configured-diskbytes-"+sharetype, bucket_diskbytes)
-            self.increment(rec, "configured-buckets", 1)
-            self.increment(rec, "configured-buckets-"+sharetype, 1)
+            self.increment_bucketspace("configured", bucket_diskbytes, sharetype)
         if sum([wks[2] for wks in would_keep_shares]) == 0:
-            self.increment(rec, "actual-diskbytes", bucket_diskbytes)
-            self.increment(rec, "actual-diskbytes-"+sharetype, bucket_diskbytes)
-            self.increment(rec, "actual-buckets", 1)
-            self.increment(rec, "actual-buckets-"+sharetype, 1)
+            self.increment_bucketspace("actual", bucket_diskbytes, sharetype)
 
     def process_share(self, sharefilename):
         # first, find out what kind of a share it is
@@ -239,11 +235,20 @@ class LeaseCheckingCrawler(ShareCrawler):
             diskbytes = sharebytes
         so_far_sr = self.state["cycle-to-date"]["space-recovered"]
         self.increment(so_far_sr, a+"-shares", 1)
-        self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
         self.increment(so_far_sr, a+"-sharebytes", sharebytes)
-        self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
         self.increment(so_far_sr, a+"-diskbytes", diskbytes)
-        self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
+        if sharetype:
+            self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
+            self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
+            self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
+
+    def increment_bucketspace(self, a, bucket_diskbytes, sharetype):
+        rec = self.state["cycle-to-date"]["space-recovered"]
+        self.increment(rec, a+"-diskbytes", bucket_diskbytes)
+        self.increment(rec, a+"-buckets", 1)
+        if sharetype:
+            self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes)
+            self.increment(rec, a+"-buckets-"+sharetype, 1)
 
     def increment(self, d, k, delta=1):
         if k not in d:
index b1e1ed12cd606f3cb80d575e367e21029c50ec36..0d09678cfe6d3bbd52b90c32710e6cd995772909 100644 (file)
@@ -2324,6 +2324,12 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
         # of share we corrupted, this will trigger an
         # UnknownImmutableContainerVersionError.
 
+        # also create an empty bucket
+        empty_si = base32.b2a("\x04"*16)
+        empty_bucket_dir = os.path.join(ss.sharedir,
+                                        storage_index_to_dir(empty_si))
+        fileutil.make_dirs(empty_bucket_dir)
+
         ss.setServiceParent(self.s)
 
         d = eventual.fireEventually()
@@ -2361,7 +2367,7 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
             s = lc.get_state()
             last = s["history"][0]
             rec = last["space-recovered"]
-            self.failUnlessEqual(rec["examined-buckets"], 4)
+            self.failUnlessEqual(rec["examined-buckets"], 5)
             self.failUnlessEqual(rec["examined-shares"], 3)
             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
         d.addCallback(_after_first_cycle)