--- /dev/null
+import time, os, pickle
+from crawler import ShareCrawler
+from shares import get_share_file
+
+class LeaseCheckingCrawler(ShareCrawler):
+ """I examine the leases on all shares, determining which are still valid
+ and which have expired. I can remove the expired leases (if so
+ configured), and the share will be deleted when the last lease is
+ removed.
+
+ I collect statistics on the leases and make these available to a web
+ status page, including::
+
+ Space recovered during this cycle-so-far:
+ actual (only if expire_leases=True):
+ num-buckets, num-shares, sum of share sizes, real disk usage
+ ('real disk usage' means we use stat(fn).st_blocks*512 and include any
+ space used by the directory)
+ what it would have been with the original lease expiration time
+ what it would have been with our configured expiration time
+
+ Prediction of space that will be recovered during the rest of this cycle
+ Prediction of space that will be recovered by the entire current cycle.
+
+ Space recovered during the last 10 cycles <-- saved in separate pickle
+
+ Shares/buckets examined:
+ this cycle-so-far
+ prediction of rest of cycle
+ during last 10 cycles <-- separate pickle
+ start/finish time of last 10 cycles <-- separate pickle
+ expiration time used for last 10 cycles <-- separate pickle
+
+ Histogram of leases-per-share:
+ this-cycle-to-date
+ last 10 cycles <-- separate pickle
+ Histogram of lease ages, buckets = expiration_time/10
+ cycle-to-date
+ last 10 cycles <-- separate pickle
+
+ All cycle-to-date values remain valid until the start of the next cycle.
+
+ """
+
+ slow_start = 360 # wait 6 minutes after startup
+ minimum_cycle_time = 12*60*60 # not more than twice per day
+
+ def __init__(self, server, statefile, historyfile,
+ expire_leases, expiration_time):
+ self.historyfile = historyfile
+ self.expire_leases = expire_leases
+ self.age_limit = expiration_time
+ ShareCrawler.__init__(self, server, statefile)
+
+ def add_initial_state(self):
+ # we fill ["cycle-to-date"] here (even though they will be reset in
+ # self.started_cycle) just in case someone grabs our state before we
+ # get started: unit tests do this
+ so_far = self.create_empty_cycle_dict()
+ self.state.setdefault("cycle-to-date", so_far)
+
+ # initialize history
+ if not os.path.exists(self.historyfile):
+ history = {} # cyclenum -> dict
+ f = open(self.historyfile, "wb")
+ pickle.dump(history, f)
+ f.close()
+
+ def create_empty_cycle_dict(self):
+ recovered = self.create_empty_recovered_dict()
+ so_far = {"buckets-examined": 0,
+ "shares-examined": 0,
+ "space-recovered": recovered,
+ "lease-age-histogram": {}, # (minage,maxage)->count
+ "leases-per-share-histogram": {}, # leasecount->numshares
+ }
+ return so_far
+
+ def create_empty_recovered_dict(self):
+ recovered = {}
+ for a in ("actual", "original-leasetimer", "configured-leasetimer"):
+ for b in ("numbuckets", "numshares", "sharebytes", "diskbytes"):
+ recovered[a+"-"+b] = 0
+ return recovered
+
+ def started_cycle(self, cycle):
+ self.state["cycle-to-date"] = self.create_empty_cycle_dict()
+
+ def stat(self, fn):
+ return os.stat(fn)
+
+ def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
+ bucketdir = os.path.join(prefixdir, storage_index_b32)
+ try:
+ bucket_diskbytes = self.stat(bucketdir).st_blocks * 512
+ except AttributeError:
+ bucket_diskbytes = 0 # no stat().st_blocks on windows
+ would_keep_shares = []
+ for fn in os.listdir(bucketdir):
+ try:
+ shnum = int(fn)
+ wks = self.process_share(os.path.join(bucketdir, fn))
+ would_keep_shares.append(wks)
+ except ValueError:
+ pass # non-numeric means not a sharefile
+ recovered = self.state["cycle-to-date"]["space-recovered"]
+ if sum([wks[0] for wks in would_keep_shares]) == 0:
+ self.increment(recovered,
+ "original-leasetimer-diskbytes", bucket_diskbytes)
+ self.increment(recovered, "original-leasetimer-numbuckets", 1)
+ if sum([wks[1] for wks in would_keep_shares]) == 0:
+ self.increment(recovered,
+ "configured-leasetimer-diskbytes", bucket_diskbytes)
+ self.increment(recovered, "configured-leasetimer-numbuckets", 1)
+ if sum([wks[2] for wks in would_keep_shares]) == 0:
+ self.increment(recovered,
+ "actual-diskbytes", bucket_diskbytes)
+ self.increment(recovered, "actual-numbuckets", 1)
+ self.state["cycle-to-date"]["buckets-examined"] += 1
+
+ def process_share(self, sharefilename):
+ # first, find out what kind of a share it is
+ sf = get_share_file(sharefilename)
+ now = time.time()
+ s = self.stat(sharefilename)
+
+ num_leases = 0
+ num_valid_leases_original = 0
+ num_valid_leases_configured = 0
+ expired_leases_configured = []
+
+ for li in sf.get_leases():
+ num_leases += 1
+ original_expiration_time = li.get_expiration_time()
+ grant_renew_time = li.get_grant_renew_time_time()
+ age = li.get_age()
+ self.add_lease_age_to_histogram(age)
+
+ # expired-or-not according to original expiration time
+ if original_expiration_time > now:
+ num_valid_leases_original += 1
+
+ # expired-or-not according to our configured age limit
+ if age < self.age_limit:
+ num_valid_leases_configured += 1
+ else:
+ expired_leases_configured.append(li)
+
+ so_far = self.state["cycle-to-date"]
+ self.increment(so_far["leases-per-share-histogram"], num_leases, 1)
+ so_far["shares-examined"] += 1
+
+ would_keep_share = [1, 1, 1]
+
+ if self.expire_leases:
+ for li in expired_leases_configured:
+ sf.cancel_lease(li.cancel_secret)
+
+ if num_valid_leases_original == 0:
+ would_keep_share[0] = 0
+ self.increment_space("original-leasetimer", s)
+
+ if num_valid_leases_configured == 0:
+ would_keep_share[1] = 0
+ self.increment_space("configured-leasetimer", s)
+ if self.expire_leases:
+ would_keep_share[2] = 0
+ self.increment_space("actual", s)
+
+ return would_keep_share
+
+ def increment_space(self, a, s):
+ sharebytes = s.st_size
+ try:
+ # note that stat(2) says that st_blocks is 512 bytes, and that
+ # st_blksize is "optimal file sys I/O ops blocksize", which is
+ # independent of the block-size that st_blocks uses.
+ diskbytes = s.st_blocks * 512
+ except AttributeError:
+ # the docs say that st_blocks is only on linux. I also see it on
+ # MacOS. But it isn't available on windows.
+ diskbytes = sharebytes
+ so_far_sr = self.state["cycle-to-date"]["space-recovered"]
+ self.increment(so_far_sr, a+"-numshares", 1)
+ self.increment(so_far_sr, a+"-sharebytes", sharebytes)
+ self.increment(so_far_sr, a+"-diskbytes", diskbytes)
+
+ def increment(self, d, k, delta=1):
+ if k not in d:
+ d[k] = 0
+ d[k] += delta
+
+ def add_lease_age_to_histogram(self, age):
+ bucket_interval = self.age_limit / 10.0
+ bucket_number = int(age/bucket_interval)
+ bucket_start = bucket_number * bucket_interval
+ bucket_end = bucket_start + bucket_interval
+ k = (bucket_start, bucket_end)
+ self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1)
+
+ def convert_lease_age_histogram(self, lah):
+ # convert { (minage,maxage) : count } into [ (minage,maxage,count) ]
+ # since the former is not JSON-safe (JSON dictionaries must have
+ # string keys).
+ json_safe_lah = []
+ for k in sorted(lah):
+ (minage,maxage) = k
+ json_safe_lah.append( (minage, maxage, lah[k]) )
+ return json_safe_lah
+
+ def finished_cycle(self, cycle):
+ # add to our history state, prune old history
+ h = {}
+
+ start = self.state["current-cycle-start-time"]
+ now = time.time()
+ h["cycle-start-finish-times"] = (start, now)
+ h["expiration-enabled"] = self.expire_leases
+ h["configured-expiration-time"] = self.age_limit
+
+ s = self.state["cycle-to-date"]
+
+ # state["lease-age-histogram"] is a dictionary (mapping
+ # (minage,maxage) tuple to a sharecount), but we report
+ # self.get_state()["lease-age-histogram"] as a list of
+ # (min,max,sharecount) tuples, because JSON can handle that better.
+ # We record the list-of-tuples form into the history for the same
+ # reason.
+ lah = self.convert_lease_age_histogram(s["lease-age-histogram"])
+ h["lease-age-histogram"] = lah
+ h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy()
+ h["buckets-examined"] = s["buckets-examined"]
+ h["shares-examined"] = s["shares-examined"]
+ # note: if ["shares-recovered"] ever acquires an internal dict, this
+ # copy() needs to become a deepcopy
+ h["space-recovered"] = s["space-recovered"].copy()
+
+ history = pickle.load(open(self.historyfile, "rb"))
+ history[cycle] = h
+ while len(history) > 10:
+ oldcycles = sorted(history.keys())
+ del history[oldcycles[0]]
+ f = open(self.historyfile, "wb")
+ pickle.dump(history, f)
+ f.close()
+
+ def get_state(self):
+ """In addition to the crawler state described in
+ ShareCrawler.get_state(), I return the following keys which are
+ specific to the lease-checker/expirer. Note that the non-history keys
+ (with 'cycle' in their names) are only present if a cycle is
+ currently running. If the crawler is between cycles, it appropriate
+ to show the latest item in the 'history' key instead. Also note that
+ each history item has all the data in the 'cycle-to-date' value, plus
+ cycle-start-finish-times.
+
+ cycle-to-date:
+ expiration-enabled
+ configured-expiration-time
+ lease-age-histogram (list of (minage,maxage,sharecount) tuples)
+ leases-per-share-histogram
+ buckets-examined
+ shares-examined
+ space-recovered
+
+ estimated-remaining-cycle:
+ # Values may be None if not enough data has been gathered to
+ # produce an estimate.
+ buckets-examined
+ shares-examined
+ space-recovered
+
+ estimated-current-cycle:
+ # cycle-to-date plus estimated-remaining. Values may be None if
+ # not enough data has been gathered to produce an estimate.
+ buckets-examined
+ shares-examined
+ space-recovered
+
+ history: maps cyclenum to a dict with the following keys:
+ cycle-start-finish-times
+ expiration-enabled
+ configured-expiration-time
+ lease-age-histogram
+ leases-per-share-histogram
+ buckets-examined
+ shares-examined
+ space-recovered
+
+ The 'space-recovered' structure is a dictionary with the following
+ keys:
+ # 'actual' is what was actually deleted
+ actual-numbuckets
+ actual-numshares
+ actual-sharebytes
+ actual-diskbytes
+ # would have been deleted, if the original lease timer was used
+ original-leasetimer-numbuckets
+ original-leasetimer-numshares
+ original-leasetimer-sharebytes
+ original-leasetimer-diskbytes
+ # would have been deleted, if our configured max_age was used
+ configured-leasetimer-numbuckets
+ configured-leasetimer-numshares
+ configured-leasetimer-sharebytes
+ configured-leasetimer-diskbytes
+
+ """
+ progress = self.get_progress()
+
+ state = ShareCrawler.get_state(self) # does a shallow copy
+ history = pickle.load(open(self.historyfile, "rb"))
+ state["history"] = history
+
+ if not progress["cycle-in-progress"]:
+ del state["cycle-to-date"]
+ return state
+
+ so_far = state["cycle-to-date"].copy()
+ state["cycle-to-date"] = so_far
+
+ lah = so_far["lease-age-histogram"]
+ so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah)
+ so_far["expiration-enabled"] = self.expire_leases
+ so_far["configured-expiration-time"] = self.age_limit
+
+ so_far_sr = so_far["space-recovered"]
+ remaining_sr = {}
+ remaining = {"space-recovered": remaining_sr}
+ cycle_sr = {}
+ cycle = {"space-recovered": cycle_sr}
+
+ if progress["cycle-complete-percentage"] > 0.0:
+ m = 100.0 / progress["cycle-complete-percentage"]
+ for a in ("actual", "original-leasetimer", "configured-leasetimer"):
+ for b in ("numbuckets", "numshares", "sharebytes", "diskbytes"):
+ k = a+"-"+b
+ remaining_sr[k] = m * so_far_sr[k]
+ cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
+ predshares = m * so_far["shares-examined"]
+ remaining["shares-examined"] = predshares
+ cycle["shares-examined"] = so_far["shares-examined"] + predshares
+ predbuckets = m * so_far["buckets-examined"]
+ remaining["buckets-examined"] = predbuckets
+ cycle["buckets-examined"] = so_far["buckets-examined"] + predbuckets
+ else:
+ for a in ("actual", "original-leasetimer", "configured-leasetimer"):
+ for b in ("numbuckets", "numshares", "sharebytes", "diskbytes"):
+ k = a+"-"+b
+ remaining_sr[k] = None
+ cycle_sr[k] = None
+ remaining["shares-examined"] = None
+ cycle["shares-examined"] = None
+ remaining["buckets-examined"] = None
+ cycle["buckets-examined"] = None
+
+ state["estimated-remaining-cycle"] = remaining
+ state["estimated-current-cycle"] = cycle
+ return state
-import time, os.path, stat, re
+import time, os.path, stat, re, simplejson
from twisted.trial import unittest
import itertools
from allmydata import interfaces
from allmydata.util import fileutil, hashutil, base32, pollmixin
-from allmydata.storage.server import StorageServer, storage_index_to_dir
+from allmydata.storage.server import StorageServer
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import BucketWriter, BucketReader
-from allmydata.storage.common import DataTooLargeError
+from allmydata.storage.common import DataTooLargeError, storage_index_to_dir
from allmydata.storage.lease import LeaseInfo
from allmydata.storage.crawler import BucketCountingCrawler
+from allmydata.storage.expirer import LeaseCheckingCrawler
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
ReadBucketProxy
from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent
+from allmydata.test.common_web import WebRenderingMixin
from allmydata.web.storage import StorageStatus, remove_prefix
class Marker:
ss = self.create("test_container_size")
self.allocate(ss, "si1", "we1", self._lease_secret.next(),
set([0,1,2]), 100)
+ read = ss.remote_slot_readv
rstaraw = ss.remote_slot_testv_and_readv_and_writev
secrets = ( self.write_enabler("we1"),
self.renew_secret("we1"),
[])
# it should be possible to make the container smaller, although at
- # the moment this doesn't actually affect the share
+ # the moment this doesn't actually affect the share, unless the
+ # container size is dropped to zero, in which case the share is
+ # deleted.
answer = rstaraw("si1", secrets,
{0: ([], [(0,data)], len(data)+8)},
[])
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
+ answer = rstaraw("si1", secrets,
+ {0: ([], [(0,data)], 0)},
+ [])
+ self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
+
+ read_answer = read("si1", [0], [(0,10)])
+ self.failUnlessEqual(read_answer, {})
+
def test_allocate(self):
ss = self.create("test_allocate")
self.allocate(ss, "si1", "we1", self._lease_secret.next(),
def compare_leases_without_timestamps(self, leases_a, leases_b):
self.failUnlessEqual(len(leases_a), len(leases_b))
for i in range(len(leases_a)):
- num_a, a = leases_a[i]
- num_b, b = leases_b[i]
- self.failUnlessEqual(num_a, num_b)
+ a = leases_a[i]
+ b = leases_b[i]
self.failUnlessEqual(a.owner_num, b.owner_num)
self.failUnlessEqual(a.renew_secret, b.renew_secret)
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
def compare_leases(self, leases_a, leases_b):
self.failUnlessEqual(len(leases_a), len(leases_b))
for i in range(len(leases_a)):
- num_a, a = leases_a[i]
- num_b, b = leases_b[i]
- self.failUnlessEqual(num_a, num_b)
+ a = leases_a[i]
+ b = leases_b[i]
self.failUnlessEqual(a.owner_num, b.owner_num)
self.failUnlessEqual(a.renew_secret, b.renew_secret)
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
f.close()
s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
- self.failUnlessEqual(len(s0.debug_get_leases()), 1)
+ self.failUnlessEqual(len(list(s0.get_leases())), 1)
# add-lease on a missing storage index is silently ignored
self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
# re-allocate the slots and use the same secrets, that should update
# the lease
write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
- self.failUnlessEqual(len(s0.debug_get_leases()), 1)
+ self.failUnlessEqual(len(list(s0.get_leases())), 1)
# renew it directly
ss.remote_renew_lease("si1", secrets(0)[1])
- self.failUnlessEqual(len(s0.debug_get_leases()), 1)
+ self.failUnlessEqual(len(list(s0.get_leases())), 1)
# now allocate them with a bunch of different secrets, to trigger the
# extended lease code. Use add_lease for one of them.
write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
- self.failUnlessEqual(len(s0.debug_get_leases()), 2)
+ self.failUnlessEqual(len(list(s0.get_leases())), 2)
secrets2 = secrets(2)
ss.remote_add_lease("si1", secrets2[1], secrets2[2])
- self.failUnlessEqual(len(s0.debug_get_leases()), 3)
+ self.failUnlessEqual(len(list(s0.get_leases())), 3)
write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
- self.failUnlessEqual(len(s0.debug_get_leases()), 6)
+ self.failUnlessEqual(len(list(s0.get_leases())), 6)
# cancel one of them
ss.remote_cancel_lease("si1", secrets(5)[2])
- self.failUnlessEqual(len(s0.debug_get_leases()), 5)
+ self.failUnlessEqual(len(list(s0.get_leases())), 5)
- all_leases = s0.debug_get_leases()
+ all_leases = list(s0.get_leases())
# and write enough data to expand the container, forcing the server
# to move the leases
write("si1", secrets(0),
[])
# read back the leases, make sure they're still intact.
- self.compare_leases_without_timestamps(all_leases,
- s0.debug_get_leases())
+ self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
ss.remote_renew_lease("si1", secrets(0)[1])
ss.remote_renew_lease("si1", secrets(1)[1])
ss.remote_renew_lease("si1", secrets(2)[1])
ss.remote_renew_lease("si1", secrets(3)[1])
ss.remote_renew_lease("si1", secrets(4)[1])
- self.compare_leases_without_timestamps(all_leases,
- s0.debug_get_leases())
+ self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
# get a new copy of the leases, with the current timestamps. Reading
# data and failing to renew/cancel leases should leave the timestamps
# alone.
- all_leases = s0.debug_get_leases()
+ all_leases = list(s0.get_leases())
# renewing with a bogus token should prompt an error message
# examine the exception thus raised, make sure the old nodeid is
self.failUnlessRaises(IndexError,
ss.remote_cancel_lease, "si1",
secrets(20)[2])
- self.compare_leases(all_leases, s0.debug_get_leases())
+ self.compare_leases(all_leases, list(s0.get_leases()))
# reading shares should not modify the timestamp
read("si1", [], [(0,200)])
- self.compare_leases(all_leases, s0.debug_get_leases())
+ self.compare_leases(all_leases, list(s0.get_leases()))
write("si1", secrets(0),
{0: ([], [(200, "make me bigger")], None)}, [])
- self.compare_leases_without_timestamps(all_leases,
- s0.debug_get_leases())
+ self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
write("si1", secrets(0),
{0: ([], [(500, "make me really bigger")], None)}, [])
- self.compare_leases_without_timestamps(all_leases,
- s0.debug_get_leases())
+ self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
# now cancel them all
ss.remote_cancel_lease("si1", secrets(0)[2])
# the slot should still be there
remaining_shares = read("si1", [], [(0,10)])
self.failUnlessEqual(len(remaining_shares), 1)
- self.failUnlessEqual(len(s0.debug_get_leases()), 1)
+ self.failUnlessEqual(len(list(s0.get_leases())), 1)
# cancelling a non-existent lease should raise an IndexError
self.failUnlessRaises(IndexError,
# and the slot should still be there
remaining_shares = read("si1", [], [(0,10)])
self.failUnlessEqual(len(remaining_shares), 1)
- self.failUnlessEqual(len(s0.debug_get_leases()), 1)
+ self.failUnlessEqual(len(list(s0.get_leases())), 1)
ss.remote_cancel_lease("si1", secrets(4)[2])
# now the slot should be gone
html = w.renderSynchronously()
s = remove_tags(html)
self.failUnless("Total buckets: 0 (the number of" in s, s)
- self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds
+ self.failUnless("Next crawl in 59 minutes" in s, s)
d.addCallback(_check2)
return d
ss.setServiceParent(self.s)
return d
+class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
+ stop_after_first_bucket = False
+ def process_bucket(self, *args, **kwargs):
+ LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
+ if self.stop_after_first_bucket:
+ self.stop_after_first_bucket = False
+ self.cpu_slice = -1.0
+ def yielding(self, sleep_time):
+ if not self.stop_after_first_bucket:
+ self.cpu_slice = 500
+
+class BrokenStatResults:
+ pass
+class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
+ def stat(self, fn):
+ s = os.stat(fn)
+ bsr = BrokenStatResults()
+ for attrname in dir(s):
+ if attrname.startswith("_"):
+ continue
+ if attrname == "st_blocks":
+ continue
+ setattr(bsr, attrname, getattr(s, attrname))
+ return bsr
+
+class InstrumentedStorageServer(StorageServer):
+ LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
+class No_ST_BLOCKS_StorageServer(StorageServer):
+ LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
+
+class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
+
+ def setUp(self):
+ self.s = service.MultiService()
+ self.s.startService()
+ def tearDown(self):
+ return self.s.stopService()
+
+ def make_shares(self, ss):
+ def make(si):
+ return (si, hashutil.tagged_hash("renew", si),
+ hashutil.tagged_hash("cancel", si))
+ def make_mutable(si):
+ return (si, hashutil.tagged_hash("renew", si),
+ hashutil.tagged_hash("cancel", si),
+ hashutil.tagged_hash("write-enabler", si))
+ def make_extra_lease(si, num):
+ return (hashutil.tagged_hash("renew-%d" % num, si),
+ hashutil.tagged_hash("cancel-%d" % num, si))
+
+ immutable_si_0, rs0, cs0 = make("\x00" * 16)
+ immutable_si_1, rs1, cs1 = make("\x01" * 16)
+ rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
+ mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
+ mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
+ rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
+ sharenums = [0]
+ canary = FakeCanary()
+ # note: 'tahoe debug dump-share' will not handle this file, since the
+ # inner contents are not a valid CHK share
+ data = "\xff" * 1000
+
+ a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
+ 1000, canary)
+ w[0].remote_write(0, data)
+ w[0].remote_close()
+
+ a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
+ 1000, canary)
+ w[0].remote_write(0, data)
+ w[0].remote_close()
+ ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
+
+ writev = ss.remote_slot_testv_and_readv_and_writev
+ writev(mutable_si_2, (we2, rs2, cs2),
+ {0: ([], [(0,data)], len(data))}, [])
+ writev(mutable_si_3, (we3, rs3, cs3),
+ {0: ([], [(0,data)], len(data))}, [])
+ ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
+
+ self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
+ self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
+ self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
+
+ def test_basic(self):
+ basedir = "storage/LeaseCrawler/basic"
+ fileutil.make_dirs(basedir)
+ ss = InstrumentedStorageServer(basedir, "\x00" * 20)
+ # make it start sooner than usual.
+ lc = ss.lease_checker
+ lc.slow_start = 0
+ lc.cpu_slice = 500
+ lc.stop_after_first_bucket = True
+ webstatus = StorageStatus(ss)
+
+ # create a few shares, with some leases on them
+ self.make_shares(ss)
+ [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
+
+ # add a non-sharefile to exercise another code path
+ fn = os.path.join(ss.sharedir,
+ storage_index_to_dir(immutable_si_0),
+ "not-a-share")
+ f = open(fn, "wb")
+ f.write("I am not a share.\n")
+ f.close()
+
+ # this is before the crawl has started, so we're not in a cycle yet
+ initial_state = lc.get_state()
+ self.failIf(lc.get_progress()["cycle-in-progress"])
+ self.failIf("cycle-to-date" in initial_state)
+ self.failIf("estimated-remaining-cycle" in initial_state)
+ self.failIf("estimated-current-cycle" in initial_state)
+ self.failUnless("history" in initial_state)
+ self.failUnlessEqual(initial_state["history"], {})
+
+ ss.setServiceParent(self.s)
+
+ d = eventual.fireEventually()
+
+ # now examine the state right after the first bucket has been
+ # processed.
+ def _after_first_bucket(ignored):
+ initial_state = lc.get_state()
+ self.failUnless("cycle-to-date" in initial_state)
+ self.failUnless("estimated-remaining-cycle" in initial_state)
+ self.failUnless("estimated-current-cycle" in initial_state)
+ self.failUnless("history" in initial_state)
+ self.failUnlessEqual(initial_state["history"], {})
+
+ so_far = initial_state["cycle-to-date"]
+ self.failUnlessEqual(so_far["expiration-enabled"], False)
+ self.failUnless("configured-expiration-time" in so_far)
+ self.failUnless("lease-age-histogram" in so_far)
+ lah = so_far["lease-age-histogram"]
+ self.failUnlessEqual(type(lah), list)
+ self.failUnlessEqual(len(lah), 1)
+ self.failUnlessEqual(lah, [ (0.0, lc.age_limit/10.0, 1) ] )
+ self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
+ self.failUnlessEqual(so_far["buckets-examined"], 1)
+ self.failUnlessEqual(so_far["shares-examined"], 1)
+ sr1 = so_far["space-recovered"]
+ self.failUnlessEqual(sr1["actual-numshares"], 0)
+ self.failUnlessEqual(sr1["configured-leasetimer-diskbytes"], 0)
+ self.failUnlessEqual(sr1["original-leasetimer-sharebytes"], 0)
+ left = initial_state["estimated-remaining-cycle"]
+ self.failUnless(left["buckets-examined"] > 0,
+ left["buckets-examined"])
+ self.failUnless(left["shares-examined"] > 0,
+ left["shares-examined"])
+ sr2 = left["space-recovered"]
+ self.failIfEqual(sr2["actual-numshares"], None)
+ self.failIfEqual(sr2["configured-leasetimer-diskbytes"], None)
+ self.failIfEqual(sr2["original-leasetimer-sharebytes"], None)
+ d.addCallback(_after_first_bucket)
+ d.addCallback(lambda ign: self.render1(webstatus))
+ def _check_html_in_cycle(html):
+ s = remove_tags(html)
+ self.failUnlessIn("So far, this cycle has examined "
+ "1 shares in 1 buckets "
+ "and has recovered: "
+ "0 buckets, 0 shares, 0 B ", s)
+ self.failUnlessIn("If expiration were enabled, "
+ "we would have recovered: "
+ "0 buckets, 0 shares, 0 B by now", s)
+ self.failUnlessIn("and the remainder of this cycle "
+ "would probably recover: "
+ "0 buckets, 0 shares, 0 B ", s)
+ self.failUnlessIn("and the whole cycle would probably recover: "
+ "0 buckets, 0 shares, 0 B ", s)
+ self.failUnlessIn("if we were using each lease's default "
+ "31-day lease lifetime", s)
+ self.failUnlessIn("this cycle would be expected to recover: ", s)
+ d.addCallback(_check_html_in_cycle)
+
+ # wait for the crawler to finish the first cycle. Nothing should have
+ # been removed.
+ def _wait():
+ return bool(lc.get_state()["last-cycle-finished"] is not None)
+ d.addCallback(lambda ign: self.poll(_wait))
+
+ def _after_first_cycle(ignored):
+ s = lc.get_state()
+ self.failIf("cycle-to-date" in s)
+ self.failIf("estimated-remaining-cycle" in s)
+ self.failIf("estimated-current-cycle" in s)
+ last = s["history"][0]
+ self.failUnless("cycle-start-finish-times" in last)
+ self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
+ self.failUnlessEqual(last["expiration-enabled"], False)
+ self.failUnless("configured-expiration-time" in last)
+
+ self.failUnless("lease-age-histogram" in last)
+ lah = last["lease-age-histogram"]
+ self.failUnlessEqual(type(lah), list)
+ self.failUnlessEqual(len(lah), 1)
+ self.failUnlessEqual(lah, [ (0.0, lc.age_limit/10.0, 6) ] )
+
+ self.failUnlessEqual(last["leases-per-share-histogram"],
+ {1: 2, 2: 2})
+ self.failUnlessEqual(last["buckets-examined"], 4)
+ self.failUnlessEqual(last["shares-examined"], 4)
+
+ rec = last["space-recovered"]
+ self.failUnlessEqual(rec["actual-numbuckets"], 0)
+ self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 0)
+ self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 0)
+ self.failUnlessEqual(rec["actual-numshares"], 0)
+ self.failUnlessEqual(rec["original-leasetimer-numshares"], 0)
+ self.failUnlessEqual(rec["configured-leasetimer-numshares"], 0)
+ self.failUnlessEqual(rec["actual-diskbytes"], 0)
+ self.failUnlessEqual(rec["original-leasetimer-diskbytes"], 0)
+ self.failUnlessEqual(rec["configured-leasetimer-diskbytes"], 0)
+ self.failUnlessEqual(rec["actual-sharebytes"], 0)
+ self.failUnlessEqual(rec["original-leasetimer-sharebytes"], 0)
+ self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], 0)
+
+ def _get_sharefile(si):
+ return list(ss._iter_share_files(si))[0]
+ def count_leases(si):
+ return len(list(_get_sharefile(si).get_leases()))
+ self.failUnlessEqual(count_leases(immutable_si_0), 1)
+ self.failUnlessEqual(count_leases(immutable_si_1), 2)
+ self.failUnlessEqual(count_leases(mutable_si_2), 1)
+ self.failUnlessEqual(count_leases(mutable_si_3), 2)
+ d.addCallback(_after_first_cycle)
+ d.addCallback(lambda ign: self.render1(webstatus))
+ def _check_html(html):
+ s = remove_tags(html)
+ self.failUnlessIn("recovered: 0 buckets, 0 shares, 0 B "
+ "but expiration was not enabled", s)
+ d.addCallback(_check_html)
+ return d
+
+ def backdate_lease(self, sf, renew_secret, new_expire_time):
+ # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
+ # "renew" a lease with a new_expire_time that is older than what the
+ # current lease has), so we have to reach inside it.
+ for i,lease in enumerate(sf.get_leases()):
+ if lease.renew_secret == renew_secret:
+ lease.expiration_time = new_expire_time
+ f = open(sf.home, 'rb+')
+ sf._write_lease_record(f, i, lease)
+ f.close()
+ return
+ raise IndexError("unable to renew non-existent lease")
+
+ def test_expire(self):
+ basedir = "storage/LeaseCrawler/expire"
+ fileutil.make_dirs(basedir)
+ # setting expiration_time to 2000 means that any lease which is more
+ # than 2000s old will be expired.
+ ss = InstrumentedStorageServer(basedir, "\x00" * 20,
+ expire_leases=True,
+ expiration_time=2000)
+ # make it start sooner than usual.
+ lc = ss.lease_checker
+ lc.slow_start = 0
+ lc.stop_after_first_bucket = True
+ webstatus = StorageStatus(ss)
+
+ # create a few shares, with some leases on them
+ self.make_shares(ss)
+ [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
+
+ def count_shares(si):
+ return len(list(ss._iter_share_files(si)))
+ def _get_sharefile(si):
+ return list(ss._iter_share_files(si))[0]
+ def count_leases(si):
+ return len(list(_get_sharefile(si).get_leases()))
+
+ self.failUnlessEqual(count_shares(immutable_si_0), 1)
+ self.failUnlessEqual(count_leases(immutable_si_0), 1)
+ self.failUnlessEqual(count_shares(immutable_si_1), 1)
+ self.failUnlessEqual(count_leases(immutable_si_1), 2)
+ self.failUnlessEqual(count_shares(mutable_si_2), 1)
+ self.failUnlessEqual(count_leases(mutable_si_2), 1)
+ self.failUnlessEqual(count_shares(mutable_si_3), 1)
+ self.failUnlessEqual(count_leases(mutable_si_3), 2)
+
+ # artificially crank back the expiration time on the first lease of
+ # each share, to make it look like it expired already (age=1000s).
+ # Some shares have an extra lease which is set to expire at the
+ # default time in 31 days from now (age=31days). We then run the
+ # crawler, which will expire the first lease, making some shares get
+ # deleted and others stay alive (with one remaining lease)
+ now = time.time()
+
+ sf0 = _get_sharefile(immutable_si_0)
+ self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
+ sf0_size = os.stat(sf0.home).st_size
+
+ # immutable_si_1 gets an extra lease
+ sf1 = _get_sharefile(immutable_si_1)
+ self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
+
+ sf2 = _get_sharefile(mutable_si_2)
+ self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
+ sf2_size = os.stat(sf2.home).st_size
+
+ # mutable_si_3 gets an extra lease
+ sf3 = _get_sharefile(mutable_si_3)
+ self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
+
+ ss.setServiceParent(self.s)
+
+ d = eventual.fireEventually()
+ # examine the state right after the first bucket has been processed
+ def _after_first_bucket(ignored):
+ p = lc.get_progress()
+ self.failUnless(p["cycle-in-progress"])
+ d.addCallback(_after_first_bucket)
+ d.addCallback(lambda ign: self.render1(webstatus))
+ def _check_html_in_cycle(html):
+ s = remove_tags(html)
+ # the first bucket encountered gets deleted, and its prefix
+ # happens to be about 1/6th of the way through the ring, so the
+ # predictor thinks we'll have 6 shares and that we'll delete them
+ # all. This part of the test depends upon the SIs landing right
+ # where they do now.
+ self.failUnlessIn("The remainder of this cycle is expected to "
+ "recover: 5 buckets, 5 shares", s)
+ self.failUnlessIn("The whole cycle is expected to examine "
+ "6 shares in 6 buckets and to recover: "
+ "6 buckets, 6 shares", s)
+ d.addCallback(_check_html_in_cycle)
+
+ # wait for the crawler to finish the first cycle. Two shares should
+ # have been removed
+ def _wait():
+ return bool(lc.get_state()["last-cycle-finished"] is not None)
+ d.addCallback(lambda ign: self.poll(_wait))
+
+ def _after_first_cycle(ignored):
+ self.failUnlessEqual(count_shares(immutable_si_0), 0)
+ self.failUnlessEqual(count_shares(immutable_si_1), 1)
+ self.failUnlessEqual(count_leases(immutable_si_1), 1)
+ self.failUnlessEqual(count_shares(mutable_si_2), 0)
+ self.failUnlessEqual(count_shares(mutable_si_3), 1)
+ self.failUnlessEqual(count_leases(mutable_si_3), 1)
+
+ s = lc.get_state()
+ last = s["history"][0]
+
+ self.failUnlessEqual(last["expiration-enabled"], True)
+ self.failUnlessEqual(last["configured-expiration-time"], 2000)
+ self.failUnlessEqual(last["buckets-examined"], 4)
+ self.failUnlessEqual(last["shares-examined"], 4)
+ self.failUnlessEqual(last["leases-per-share-histogram"],
+ {1: 2, 2: 2})
+
+ rec = last["space-recovered"]
+ self.failUnlessEqual(rec["actual-numbuckets"], 2)
+ self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 2)
+ self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 2)
+ self.failUnlessEqual(rec["actual-numshares"], 2)
+ self.failUnlessEqual(rec["original-leasetimer-numshares"], 2)
+ self.failUnlessEqual(rec["configured-leasetimer-numshares"], 2)
+ size = sf0_size + sf2_size
+ self.failUnlessEqual(rec["actual-sharebytes"], size)
+ self.failUnlessEqual(rec["original-leasetimer-sharebytes"], size)
+ self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], size)
+ self.failUnless(rec["actual-diskbytes"] >= size,
+ rec["actual-diskbytes"])
+ self.failUnless(rec["original-leasetimer-diskbytes"] >= size,
+ rec["original-leasetimer-diskbytes"])
+ self.failUnless(rec["configured-leasetimer-diskbytes"] >= size,
+ rec["configured-leasetimer-diskbytes"])
+ d.addCallback(_after_first_cycle)
+ d.addCallback(lambda ign: self.render1(webstatus))
+ def _check_html(html):
+ s = remove_tags(html)
+ self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
+ self.failUnlessIn(" recovered: 2 buckets, 2 shares, ", s)
+ d.addCallback(_check_html)
+ return d
+
+ def test_limited_history(self):
+ basedir = "storage/LeaseCrawler/limited_history"
+ fileutil.make_dirs(basedir)
+ ss = StorageServer(basedir, "\x00" * 20)
+ # make it start sooner than usual.
+ lc = ss.lease_checker
+ lc.slow_start = 0
+ lc.cpu_slice = 500
+
+ # create a few shares, with some leases on them
+ self.make_shares(ss)
+
+ ss.setServiceParent(self.s)
+
+ def _wait_until_15_cycles_done():
+ last = lc.state["last-cycle-finished"]
+ if last is not None and last >= 15:
+ return True
+ if lc.timer:
+ lc.timer.reset(0)
+ return False
+ d = self.poll(_wait_until_15_cycles_done)
+
+ def _check(ignored):
+ s = lc.get_state()
+ h = s["history"]
+ self.failUnlessEqual(len(h), 10)
+ self.failUnlessEqual(max(h.keys()), 15)
+ self.failUnlessEqual(min(h.keys()), 6)
+ d.addCallback(_check)
+ return d
+
+ def test_unpredictable_future(self):
+ basedir = "storage/LeaseCrawler/unpredictable_future"
+ fileutil.make_dirs(basedir)
+ ss = StorageServer(basedir, "\x00" * 20)
+ # make it start sooner than usual.
+ lc = ss.lease_checker
+ lc.slow_start = 0
+ lc.cpu_slice = -1.0 # stop quickly
+
+ self.make_shares(ss)
+
+ ss.setServiceParent(self.s)
+
+ d = eventual.fireEventually()
+ def _check(ignored):
+ # this should fire after the first bucket is complete, but before
+ # the first prefix is complete, so the progress-measurer won't
+ # think we've gotten far enough to raise our percent-complete
+ # above 0%, triggering the cannot-predict-the-future code in
+ # expirer.py . This will have to change if/when the
+ # progress-measurer gets smart enough to count buckets (we'll
+ # have to interrupt it even earlier, before it's finished the
+ # first bucket).
+ s = lc.get_state()
+ self.failUnless("cycle-to-date" in s)
+ self.failUnless("estimated-remaining-cycle" in s)
+ self.failUnless("estimated-current-cycle" in s)
+
+ left = s["estimated-remaining-cycle"]["space-recovered"]
+ self.failUnlessEqual(left["actual-numbuckets"], None)
+ self.failUnlessEqual(left["original-leasetimer-numbuckets"], None)
+ self.failUnlessEqual(left["configured-leasetimer-numbuckets"], None)
+ self.failUnlessEqual(left["actual-numshares"], None)
+ self.failUnlessEqual(left["original-leasetimer-numshares"], None)
+ self.failUnlessEqual(left["configured-leasetimer-numshares"], None)
+ self.failUnlessEqual(left["actual-diskbytes"], None)
+ self.failUnlessEqual(left["original-leasetimer-diskbytes"], None)
+ self.failUnlessEqual(left["configured-leasetimer-diskbytes"], None)
+ self.failUnlessEqual(left["actual-sharebytes"], None)
+ self.failUnlessEqual(left["original-leasetimer-sharebytes"], None)
+ self.failUnlessEqual(left["configured-leasetimer-sharebytes"], None)
+
+ full = s["estimated-remaining-cycle"]["space-recovered"]
+ self.failUnlessEqual(full["actual-numbuckets"], None)
+ self.failUnlessEqual(full["original-leasetimer-numbuckets"], None)
+ self.failUnlessEqual(full["configured-leasetimer-numbuckets"], None)
+ self.failUnlessEqual(full["actual-numshares"], None)
+ self.failUnlessEqual(full["original-leasetimer-numshares"], None)
+ self.failUnlessEqual(full["configured-leasetimer-numshares"], None)
+ self.failUnlessEqual(full["actual-diskbytes"], None)
+ self.failUnlessEqual(full["original-leasetimer-diskbytes"], None)
+ self.failUnlessEqual(full["configured-leasetimer-diskbytes"], None)
+ self.failUnlessEqual(full["actual-sharebytes"], None)
+ self.failUnlessEqual(full["original-leasetimer-sharebytes"], None)
+ self.failUnlessEqual(full["configured-leasetimer-sharebytes"], None)
+
+ d.addCallback(_check)
+ return d
+
+ def test_no_st_blocks(self):
+ basedir = "storage/LeaseCrawler/no_st_blocks"
+ fileutil.make_dirs(basedir)
+ ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
+ expiration_time=-1000)
+ # a negative expiration_time= means the "configured-leasetimer-"
+ # space-recovered counts will be non-zero, since all shares will have
+ # expired by then
+
+ # make it start sooner than usual.
+ lc = ss.lease_checker
+ lc.slow_start = 0
+
+ self.make_shares(ss)
+ ss.setServiceParent(self.s)
+ def _wait():
+ return bool(lc.get_state()["last-cycle-finished"] is not None)
+ d = self.poll(_wait)
+
+ def _check(ignored):
+ s = lc.get_state()
+ last = s["history"][0]
+ rec = last["space-recovered"]
+ self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 4)
+ self.failUnlessEqual(rec["configured-leasetimer-numshares"], 4)
+ self.failUnless(rec["configured-leasetimer-sharebytes"] > 0,
+ rec["configured-leasetimer-sharebytes"])
+ # without the .st_blocks field in os.stat() results, we should be
+ # reporting diskbytes==sharebytes
+ self.failUnlessEqual(rec["configured-leasetimer-sharebytes"],
+ rec["configured-leasetimer-diskbytes"])
+ d.addCallback(_check)
+ return d
class NoStatvfsServer(StorageServer):
def do_statvfs(self):
raise AttributeError
-class WebStatus(unittest.TestCase, pollmixin.PollMixin):
+class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
def setUp(self):
self.s = service.MultiService()
ss = StorageServer(basedir, "\x00" * 20)
ss.setServiceParent(self.s)
w = StorageStatus(ss)
- html = w.renderSynchronously()
- self.failUnless("<h1>Storage Server Status</h1>" in html, html)
- s = remove_tags(html)
- self.failUnless("Accepting new shares: Yes" in s, s)
- self.failUnless("Reserved space: - 0 B (0)" in s, s)
+ d = self.render1(w)
+ def _check_html(html):
+ self.failUnless("<h1>Storage Server Status</h1>" in html, html)
+ s = remove_tags(html)
+ self.failUnless("Accepting new shares: Yes" in s, s)
+ self.failUnless("Reserved space: - 0 B (0)" in s, s)
+ d.addCallback(_check_html)
+ d.addCallback(lambda ign: self.render_json(w))
+ def _check_json(json):
+ data = simplejson.loads(json)
+ s = data["stats"]
+ self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
+ self.failUnlessEqual(s["storage_server.reserved_space"], 0)
+ self.failUnless("bucket-counter" in data)
+ self.failUnless("lease-checker" in data)
+ d.addCallback(_check_json)
+ return d
+
+ def render_json(self, page):
+ d = self.render1(page, args={"t": ["json"]})
+ return d
def test_status_no_statvfs(self):
# windows has no os.statvfs . Make sure the code handles that even on