1 import time, os, pickle, struct
2 from allmydata.storage.crawler import ShareCrawler
3 from allmydata.storage.shares import get_share_file
4 from allmydata.storage.common import UnknownMutableContainerVersionError, \
5 UnknownImmutableContainerVersionError
6 from twisted.python import log as twlog
8 class LeaseCheckingCrawler(ShareCrawler):
9 """I examine the leases on all shares, determining which are still valid
10 and which have expired. I can remove the expired leases (if so
11 configured), and the share will be deleted when the last lease is
14 I collect statistics on the leases and make these available to a web
15 status page, including::
17 Space recovered during this cycle-so-far:
18 actual (only if expiration_enabled=True):
19 num-buckets, num-shares, sum of share sizes, real disk usage
20 ('real disk usage' means we use stat(fn).st_blocks*512 and include any
21 space used by the directory)
22 what it would have been with the original lease expiration time
23 what it would have been with our configured expiration time
25 Prediction of space that will be recovered during the rest of this cycle
26 Prediction of space that will be recovered by the entire current cycle.
28 Space recovered during the last 10 cycles <-- saved in separate pickle
30 Shares/buckets examined:
32 prediction of rest of cycle
33 during last 10 cycles <-- separate pickle
34 start/finish time of last 10 cycles <-- separate pickle
35 expiration time used for last 10 cycles <-- separate pickle
37 Histogram of leases-per-share:
39 last 10 cycles <-- separate pickle
40 Histogram of lease ages, buckets = 1day
42 last 10 cycles <-- separate pickle
44 All cycle-to-date values remain valid until the start of the next cycle.
48 slow_start = 360 # wait 6 minutes after startup
49 minimum_cycle_time = 12*60*60 # not more than twice per day
51 def __init__(self, server, statefile, historyfile,
52 expiration_enabled, mode,
53 override_lease_duration, # used if expiration_mode=="age"
54 cutoff_date, # used if expiration_mode=="cutoff-date"
56 self.historyfile = historyfile
57 self.expiration_enabled = expiration_enabled
59 self.override_lease_duration = None
60 self.cutoff_date = None
61 if self.mode == "age":
62 assert isinstance(override_lease_duration, (int, type(None)))
63 self.override_lease_duration = override_lease_duration # seconds
64 elif self.mode == "cutoff-date":
65 assert isinstance(cutoff_date, int) # seconds-since-epoch
66 assert cutoff_date is not None
67 self.cutoff_date = cutoff_date
69 raise ValueError("GC mode '%s' must be 'age' or 'cutoff-date'" % mode)
70 self.sharetypes_to_expire = sharetypes
71 ShareCrawler.__init__(self, server, statefile)
73 def add_initial_state(self):
74 # we fill ["cycle-to-date"] here (even though they will be reset in
75 # self.started_cycle) just in case someone grabs our state before we
76 # get started: unit tests do this
77 so_far = self.create_empty_cycle_dict()
78 self.state.setdefault("cycle-to-date", so_far)
79 # in case we upgrade the code while a cycle is in progress, update
80 # the keys individually
82 self.state["cycle-to-date"].setdefault(k, so_far[k])
85 if not os.path.exists(self.historyfile):
86 history = {} # cyclenum -> dict
87 f = open(self.historyfile, "wb")
88 pickle.dump(history, f)
91 def create_empty_cycle_dict(self):
92 recovered = self.create_empty_recovered_dict()
93 so_far = {"corrupt-shares": [],
94 "space-recovered": recovered,
95 "lease-age-histogram": {}, # (minage,maxage)->count
96 "leases-per-share-histogram": {}, # leasecount->numshares
100 def create_empty_recovered_dict(self):
102 for a in ("actual", "original", "configured", "examined"):
103 for b in ("buckets", "shares", "sharebytes", "diskbytes"):
104 recovered[a+"-"+b] = 0
105 recovered[a+"-"+b+"-mutable"] = 0
106 recovered[a+"-"+b+"-immutable"] = 0
109 def started_cycle(self, cycle):
110 self.state["cycle-to-date"] = self.create_empty_cycle_dict()
115 def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
116 bucketdir = os.path.join(prefixdir, storage_index_b32)
117 s = self.stat(bucketdir)
118 would_keep_shares = []
121 for fn in os.listdir(bucketdir):
125 continue # non-numeric means not a sharefile
126 sharefile = os.path.join(bucketdir, fn)
128 wks = self.process_share(sharefile)
129 except (UnknownMutableContainerVersionError,
130 UnknownImmutableContainerVersionError,
132 twlog.msg("lease-checker error processing %s" % sharefile)
134 which = (storage_index_b32, shnum)
135 self.state["cycle-to-date"]["corrupt-shares"].append(which)
136 wks = (1, 1, 1, "unknown")
137 would_keep_shares.append(wks)
141 # use the last share's sharetype as the buckettype
143 rec = self.state["cycle-to-date"]["space-recovered"]
144 self.increment(rec, "examined-buckets", 1)
146 self.increment(rec, "examined-buckets-"+sharetype, 1)
149 bucket_diskbytes = s.st_blocks * 512
150 except AttributeError:
151 bucket_diskbytes = 0 # no stat().st_blocks on windows
152 if sum([wks[0] for wks in would_keep_shares]) == 0:
153 self.increment_bucketspace("original", bucket_diskbytes, sharetype)
154 if sum([wks[1] for wks in would_keep_shares]) == 0:
155 self.increment_bucketspace("configured", bucket_diskbytes, sharetype)
156 if sum([wks[2] for wks in would_keep_shares]) == 0:
157 self.increment_bucketspace("actual", bucket_diskbytes, sharetype)
159 def process_share(self, sharefilename):
160 # first, find out what kind of a share it is
161 sf = get_share_file(sharefilename)
162 sharetype = sf.sharetype
164 s = self.stat(sharefilename)
167 num_valid_leases_original = 0
168 num_valid_leases_configured = 0
169 expired_leases_configured = []
171 for li in sf.get_leases():
173 original_expiration_time = li.get_expiration_time()
174 grant_renew_time = li.get_grant_renew_time_time()
176 self.add_lease_age_to_histogram(age)
178 # expired-or-not according to original expiration time
179 if original_expiration_time > now:
180 num_valid_leases_original += 1
182 # expired-or-not according to our configured age limit
184 if self.mode == "age":
185 age_limit = original_expiration_time
186 if self.override_lease_duration is not None:
187 age_limit = self.override_lease_duration
191 assert self.mode == "cutoff-date"
192 if grant_renew_time < self.cutoff_date:
194 if sharetype not in self.sharetypes_to_expire:
198 expired_leases_configured.append(li)
200 num_valid_leases_configured += 1
202 so_far = self.state["cycle-to-date"]
203 self.increment(so_far["leases-per-share-histogram"], num_leases, 1)
204 self.increment_space("examined", s, sharetype)
206 would_keep_share = [1, 1, 1, sharetype]
208 if self.expiration_enabled:
209 for li in expired_leases_configured:
210 sf.cancel_lease(li.cancel_secret)
212 if num_valid_leases_original == 0:
213 would_keep_share[0] = 0
214 self.increment_space("original", s, sharetype)
216 if num_valid_leases_configured == 0:
217 would_keep_share[1] = 0
218 self.increment_space("configured", s, sharetype)
219 if self.expiration_enabled:
220 would_keep_share[2] = 0
221 self.increment_space("actual", s, sharetype)
223 return would_keep_share
225 def increment_space(self, a, s, sharetype):
226 sharebytes = s.st_size
228 # note that stat(2) says that st_blocks is 512 bytes, and that
229 # st_blksize is "optimal file sys I/O ops blocksize", which is
230 # independent of the block-size that st_blocks uses.
231 diskbytes = s.st_blocks * 512
232 except AttributeError:
233 # the docs say that st_blocks is only on linux. I also see it on
234 # MacOS. But it isn't available on windows.
235 diskbytes = sharebytes
236 so_far_sr = self.state["cycle-to-date"]["space-recovered"]
237 self.increment(so_far_sr, a+"-shares", 1)
238 self.increment(so_far_sr, a+"-sharebytes", sharebytes)
239 self.increment(so_far_sr, a+"-diskbytes", diskbytes)
241 self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
242 self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
243 self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
245 def increment_bucketspace(self, a, bucket_diskbytes, sharetype):
246 rec = self.state["cycle-to-date"]["space-recovered"]
247 self.increment(rec, a+"-diskbytes", bucket_diskbytes)
248 self.increment(rec, a+"-buckets", 1)
250 self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes)
251 self.increment(rec, a+"-buckets-"+sharetype, 1)
253 def increment(self, d, k, delta=1):
258 def add_lease_age_to_histogram(self, age):
259 bucket_interval = 24*60*60
260 bucket_number = int(age/bucket_interval)
261 bucket_start = bucket_number * bucket_interval
262 bucket_end = bucket_start + bucket_interval
263 k = (bucket_start, bucket_end)
264 self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1)
266 def convert_lease_age_histogram(self, lah):
267 # convert { (minage,maxage) : count } into [ (minage,maxage,count) ]
268 # since the former is not JSON-safe (JSON dictionaries must have
271 for k in sorted(lah):
273 json_safe_lah.append( (minage, maxage, lah[k]) )
276 def finished_cycle(self, cycle):
277 # add to our history state, prune old history
280 start = self.state["current-cycle-start-time"]
282 h["cycle-start-finish-times"] = (start, now)
283 h["expiration-enabled"] = self.expiration_enabled
284 h["configured-expiration-mode"] = (self.mode,
285 self.override_lease_duration,
287 self.sharetypes_to_expire)
289 s = self.state["cycle-to-date"]
291 # state["lease-age-histogram"] is a dictionary (mapping
292 # (minage,maxage) tuple to a sharecount), but we report
293 # self.get_state()["lease-age-histogram"] as a list of
294 # (min,max,sharecount) tuples, because JSON can handle that better.
295 # We record the list-of-tuples form into the history for the same
297 lah = self.convert_lease_age_histogram(s["lease-age-histogram"])
298 h["lease-age-histogram"] = lah
299 h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy()
300 h["corrupt-shares"] = s["corrupt-shares"][:]
301 # note: if ["shares-recovered"] ever acquires an internal dict, this
302 # copy() needs to become a deepcopy
303 h["space-recovered"] = s["space-recovered"].copy()
305 history = pickle.load(open(self.historyfile, "rb"))
307 while len(history) > 10:
308 oldcycles = sorted(history.keys())
309 del history[oldcycles[0]]
310 f = open(self.historyfile, "wb")
311 pickle.dump(history, f)
315 """In addition to the crawler state described in
316 ShareCrawler.get_state(), I return the following keys which are
317 specific to the lease-checker/expirer. Note that the non-history keys
318 (with 'cycle' in their names) are only present if a cycle is
319 currently running. If the crawler is between cycles, it appropriate
320 to show the latest item in the 'history' key instead. Also note that
321 each history item has all the data in the 'cycle-to-date' value, plus
322 cycle-start-finish-times.
326 configured-expiration-mode
327 lease-age-histogram (list of (minage,maxage,sharecount) tuples)
328 leases-per-share-histogram
329 corrupt-shares (list of (si_b32,shnum) tuples, minimal verification)
332 estimated-remaining-cycle:
333 # Values may be None if not enough data has been gathered to
334 # produce an estimate.
337 estimated-current-cycle:
338 # cycle-to-date plus estimated-remaining. Values may be None if
339 # not enough data has been gathered to produce an estimate.
342 history: maps cyclenum to a dict with the following keys:
343 cycle-start-finish-times
345 configured-expiration-mode
347 leases-per-share-histogram
351 The 'space-recovered' structure is a dictionary with the following
353 # 'examined' is what was looked at
354 examined-buckets, examined-buckets-mutable, examined-buckets-immutable
355 examined-shares, -mutable, -immutable
356 examined-sharebytes, -mutable, -immutable
357 examined-diskbytes, -mutable, -immutable
359 # 'actual' is what was actually deleted
360 actual-buckets, -mutable, -immutable
361 actual-shares, -mutable, -immutable
362 actual-sharebytes, -mutable, -immutable
363 actual-diskbytes, -mutable, -immutable
365 # would have been deleted, if the original lease timer was used
366 original-buckets, -mutable, -immutable
367 original-shares, -mutable, -immutable
368 original-sharebytes, -mutable, -immutable
369 original-diskbytes, -mutable, -immutable
371 # would have been deleted, if our configured max_age was used
372 configured-buckets, -mutable, -immutable
373 configured-shares, -mutable, -immutable
374 configured-sharebytes, -mutable, -immutable
375 configured-diskbytes, -mutable, -immutable
378 progress = self.get_progress()
380 state = ShareCrawler.get_state(self) # does a shallow copy
381 history = pickle.load(open(self.historyfile, "rb"))
382 state["history"] = history
384 if not progress["cycle-in-progress"]:
385 del state["cycle-to-date"]
388 so_far = state["cycle-to-date"].copy()
389 state["cycle-to-date"] = so_far
391 lah = so_far["lease-age-histogram"]
392 so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah)
393 so_far["expiration-enabled"] = self.expiration_enabled
394 so_far["configured-expiration-mode"] = (self.mode,
395 self.override_lease_duration,
397 self.sharetypes_to_expire)
399 so_far_sr = so_far["space-recovered"]
401 remaining = {"space-recovered": remaining_sr}
403 cycle = {"space-recovered": cycle_sr}
405 if progress["cycle-complete-percentage"] > 0.0:
406 pc = progress["cycle-complete-percentage"] / 100.0
408 for a in ("actual", "original", "configured", "examined"):
409 for b in ("buckets", "shares", "sharebytes", "diskbytes"):
410 for c in ("", "-mutable", "-immutable"):
412 remaining_sr[k] = m * so_far_sr[k]
413 cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
415 for a in ("actual", "original", "configured", "examined"):
416 for b in ("buckets", "shares", "sharebytes", "diskbytes"):
417 for c in ("", "-mutable", "-immutable"):
419 remaining_sr[k] = None
422 state["estimated-remaining-cycle"] = remaining
423 state["estimated-current-cycle"] = cycle