def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
bucketdir = os.path.join(prefixdir, storage_index_b32)
- try:
- bucket_diskbytes = self.stat(bucketdir).st_blocks * 512
- except AttributeError:
- bucket_diskbytes = 0 # no stat().st_blocks on windows
+ s = self.stat(bucketdir)
would_keep_shares = []
+ wks = None
+
for fn in os.listdir(bucketdir):
try:
shnum = int(fn)
self.state["cycle-to-date"]["corrupt-shares"].append(which)
wks = (1, 1, 1, "unknown")
would_keep_shares.append(wks)
+
sharetype = None
if wks:
+ # use the last share's sharetype as the buckettype
sharetype = wks[3]
rec = self.state["cycle-to-date"]["space-recovered"]
self.increment(rec, "examined-buckets", 1)
if sharetype:
self.increment(rec, "examined-buckets-"+sharetype, 1)
+ try:
+ bucket_diskbytes = s.st_blocks * 512
+ except AttributeError:
+ bucket_diskbytes = 0 # no stat().st_blocks on windows
if sum([wks[0] for wks in would_keep_shares]) == 0:
- self.increment(rec, "original-diskbytes", bucket_diskbytes)
- self.increment(rec, "original-diskbytes-"+sharetype, bucket_diskbytes)
- self.increment(rec, "original-buckets", 1)
- self.increment(rec, "original-buckets-"+sharetype, 1)
+ self.increment_bucketspace("original", bucket_diskbytes, sharetype)
if sum([wks[1] for wks in would_keep_shares]) == 0:
- self.increment(rec, "configured-diskbytes", bucket_diskbytes)
- self.increment(rec, "configured-diskbytes-"+sharetype, bucket_diskbytes)
- self.increment(rec, "configured-buckets", 1)
- self.increment(rec, "configured-buckets-"+sharetype, 1)
+ self.increment_bucketspace("configured", bucket_diskbytes, sharetype)
if sum([wks[2] for wks in would_keep_shares]) == 0:
- self.increment(rec, "actual-diskbytes", bucket_diskbytes)
- self.increment(rec, "actual-diskbytes-"+sharetype, bucket_diskbytes)
- self.increment(rec, "actual-buckets", 1)
- self.increment(rec, "actual-buckets-"+sharetype, 1)
+ self.increment_bucketspace("actual", bucket_diskbytes, sharetype)
def process_share(self, sharefilename):
# first, find out what kind of a share it is
diskbytes = sharebytes
so_far_sr = self.state["cycle-to-date"]["space-recovered"]
self.increment(so_far_sr, a+"-shares", 1)
- self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
self.increment(so_far_sr, a+"-sharebytes", sharebytes)
- self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
self.increment(so_far_sr, a+"-diskbytes", diskbytes)
- self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
+ if sharetype:
+ self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
+ self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
+ self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
+
+ def increment_bucketspace(self, a, bucket_diskbytes, sharetype):
+ rec = self.state["cycle-to-date"]["space-recovered"]
+ self.increment(rec, a+"-diskbytes", bucket_diskbytes)
+ self.increment(rec, a+"-buckets", 1)
+ if sharetype:
+ self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes)
+ self.increment(rec, a+"-buckets-"+sharetype, 1)
def increment(self, d, k, delta=1):
if k not in d:
# of share we corrupted, this will trigger an
# UnknownImmutableContainerVersionError.
+ # also create an empty bucket
+ empty_si = base32.b2a("\x04"*16)
+ empty_bucket_dir = os.path.join(ss.sharedir,
+ storage_index_to_dir(empty_si))
+ fileutil.make_dirs(empty_bucket_dir)
+
ss.setServiceParent(self.s)
d = eventual.fireEventually()
s = lc.get_state()
last = s["history"][0]
rec = last["space-recovered"]
- self.failUnlessEqual(rec["examined-buckets"], 4)
+ self.failUnlessEqual(rec["examined-buckets"], 5)
self.failUnlessEqual(rec["examined-shares"], 3)
self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
d.addCallback(_after_first_cycle)