]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/expirer.py
41823e7e4285f163b6e6119157e0fc8992e19f3c
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / expirer.py
1 import time, os, pickle, struct
2 from allmydata.storage.crawler import ShareCrawler
3 from allmydata.storage.shares import get_share_file
4 from allmydata.storage.common import UnknownMutableContainerVersionError, \
5      UnknownImmutableContainerVersionError
6 from twisted.python import log as twlog
7
8 class LeaseCheckingCrawler(ShareCrawler):
9     """I examine the leases on all shares, determining which are still valid
10     and which have expired. I can remove the expired leases (if so
11     configured), and the share will be deleted when the last lease is
12     removed.
13
14     I collect statistics on the leases and make these available to a web
15     status page, including::
16
17     Space recovered during this cycle-so-far:
18      actual (only if expiration_enabled=True):
19       num-buckets, num-shares, sum of share sizes, real disk usage
20       ('real disk usage' means we use stat(fn).st_blocks*512 and include any
21        space used by the directory)
22      what it would have been with the original lease expiration time
23      what it would have been with our configured expiration time
24
25     Prediction of space that will be recovered during the rest of this cycle
26     Prediction of space that will be recovered by the entire current cycle.
27
28     Space recovered during the last 10 cycles  <-- saved in separate pickle
29
30     Shares/buckets examined:
31      this cycle-so-far
32      prediction of rest of cycle
33      during last 10 cycles <-- separate pickle
34     start/finish time of last 10 cycles  <-- separate pickle
35     expiration time used for last 10 cycles <-- separate pickle
36
37     Histogram of leases-per-share:
38      this-cycle-to-date
39      last 10 cycles <-- separate pickle
40     Histogram of lease ages, buckets = 1day
41      cycle-to-date
42      last 10 cycles <-- separate pickle
43
44     All cycle-to-date values remain valid until the start of the next cycle.
45
46     """
47
48     slow_start = 360 # wait 6 minutes after startup
49     minimum_cycle_time = 12*60*60 # not more than twice per day
50
51     def __init__(self, server, statefile, historyfile,
52                  expiration_enabled, mode,
53                  override_lease_duration, # used if expiration_mode=="age"
54                  cutoff_date, # used if expiration_mode=="cutoff-date"
55                  sharetypes):
56         self.historyfile = historyfile
57         self.expiration_enabled = expiration_enabled
58         self.mode = mode
59         self.override_lease_duration = None
60         self.cutoff_date = None
61         if self.mode == "age":
62             assert isinstance(override_lease_duration, (int, type(None)))
63             self.override_lease_duration = override_lease_duration # seconds
64         elif self.mode == "cutoff-date":
65             assert isinstance(cutoff_date, int) # seconds-since-epoch
66             assert cutoff_date is not None
67             self.cutoff_date = cutoff_date
68         else:
69             raise ValueError("GC mode '%s' must be 'age' or 'cutoff-date'" % mode)
70         self.sharetypes_to_expire = sharetypes
71         ShareCrawler.__init__(self, server, statefile)
72
73     def add_initial_state(self):
74         # we fill ["cycle-to-date"] here (even though they will be reset in
75         # self.started_cycle) just in case someone grabs our state before we
76         # get started: unit tests do this
77         so_far = self.create_empty_cycle_dict()
78         self.state.setdefault("cycle-to-date", so_far)
79         # in case we upgrade the code while a cycle is in progress, update
80         # the keys individually
81         for k in so_far:
82             self.state["cycle-to-date"].setdefault(k, so_far[k])
83
84         # initialize history
85         if not os.path.exists(self.historyfile):
86             history = {} # cyclenum -> dict
87             f = open(self.historyfile, "wb")
88             pickle.dump(history, f)
89             f.close()
90
91     def create_empty_cycle_dict(self):
92         recovered = self.create_empty_recovered_dict()
93         so_far = {"corrupt-shares": [],
94                   "space-recovered": recovered,
95                   "lease-age-histogram": {}, # (minage,maxage)->count
96                   "leases-per-share-histogram": {}, # leasecount->numshares
97                   }
98         return so_far
99
100     def create_empty_recovered_dict(self):
101         recovered = {}
102         for a in ("actual", "original", "configured", "examined"):
103             for b in ("buckets", "shares", "sharebytes", "diskbytes"):
104                 recovered[a+"-"+b] = 0
105                 recovered[a+"-"+b+"-mutable"] = 0
106                 recovered[a+"-"+b+"-immutable"] = 0
107         return recovered
108
109     def started_cycle(self, cycle):
110         self.state["cycle-to-date"] = self.create_empty_cycle_dict()
111
112     def stat(self, fn):
113         return os.stat(fn)
114
115     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
116         bucketdir = os.path.join(prefixdir, storage_index_b32)
117         s = self.stat(bucketdir)
118         would_keep_shares = []
119         wks = None
120
121         for fn in os.listdir(bucketdir):
122             try:
123                 shnum = int(fn)
124             except ValueError:
125                 continue # non-numeric means not a sharefile
126             sharefile = os.path.join(bucketdir, fn)
127             try:
128                 wks = self.process_share(sharefile)
129             except (UnknownMutableContainerVersionError,
130                     UnknownImmutableContainerVersionError,
131                     struct.error):
132                 twlog.msg("lease-checker error processing %s" % sharefile)
133                 twlog.err()
134                 which = (storage_index_b32, shnum)
135                 self.state["cycle-to-date"]["corrupt-shares"].append(which)
136                 wks = (1, 1, 1, "unknown")
137             would_keep_shares.append(wks)
138
139         sharetype = None
140         if wks:
141             # use the last share's sharetype as the buckettype
142             sharetype = wks[3]
143         rec = self.state["cycle-to-date"]["space-recovered"]
144         self.increment(rec, "examined-buckets", 1)
145         if sharetype:
146             self.increment(rec, "examined-buckets-"+sharetype, 1)
147
148         try:
149             bucket_diskbytes = s.st_blocks * 512
150         except AttributeError:
151             bucket_diskbytes = 0 # no stat().st_blocks on windows
152         if sum([wks[0] for wks in would_keep_shares]) == 0:
153             self.increment_bucketspace("original", bucket_diskbytes, sharetype)
154         if sum([wks[1] for wks in would_keep_shares]) == 0:
155             self.increment_bucketspace("configured", bucket_diskbytes, sharetype)
156         if sum([wks[2] for wks in would_keep_shares]) == 0:
157             self.increment_bucketspace("actual", bucket_diskbytes, sharetype)
158
159     def process_share(self, sharefilename):
160         # first, find out what kind of a share it is
161         sf = get_share_file(sharefilename)
162         sharetype = sf.sharetype
163         now = time.time()
164         s = self.stat(sharefilename)
165
166         num_leases = 0
167         num_valid_leases_original = 0
168         num_valid_leases_configured = 0
169         expired_leases_configured = []
170
171         for li in sf.get_leases():
172             num_leases += 1
173             original_expiration_time = li.get_expiration_time()
174             grant_renew_time = li.get_grant_renew_time_time()
175             age = li.get_age()
176             self.add_lease_age_to_histogram(age)
177
178             #  expired-or-not according to original expiration time
179             if original_expiration_time > now:
180                 num_valid_leases_original += 1
181
182             #  expired-or-not according to our configured age limit
183             expired = False
184             if self.mode == "age":
185                 age_limit = original_expiration_time
186                 if self.override_lease_duration is not None:
187                     age_limit = self.override_lease_duration
188                 if age > age_limit:
189                     expired = True
190             else:
191                 assert self.mode == "cutoff-date"
192                 if grant_renew_time < self.cutoff_date:
193                     expired = True
194             if sharetype not in self.sharetypes_to_expire:
195                 expired = False
196
197             if expired:
198                 expired_leases_configured.append(li)
199             else:
200                 num_valid_leases_configured += 1
201
202         so_far = self.state["cycle-to-date"]
203         self.increment(so_far["leases-per-share-histogram"], num_leases, 1)
204         self.increment_space("examined", s, sharetype)
205
206         would_keep_share = [1, 1, 1, sharetype]
207
208         if self.expiration_enabled:
209             for li in expired_leases_configured:
210                 sf.cancel_lease(li.cancel_secret)
211
212         if num_valid_leases_original == 0:
213             would_keep_share[0] = 0
214             self.increment_space("original", s, sharetype)
215
216         if num_valid_leases_configured == 0:
217             would_keep_share[1] = 0
218             self.increment_space("configured", s, sharetype)
219             if self.expiration_enabled:
220                 would_keep_share[2] = 0
221                 self.increment_space("actual", s, sharetype)
222
223         return would_keep_share
224
225     def increment_space(self, a, s, sharetype):
226         sharebytes = s.st_size
227         try:
228             # note that stat(2) says that st_blocks is 512 bytes, and that
229             # st_blksize is "optimal file sys I/O ops blocksize", which is
230             # independent of the block-size that st_blocks uses.
231             diskbytes = s.st_blocks * 512
232         except AttributeError:
233             # the docs say that st_blocks is only on linux. I also see it on
234             # MacOS. But it isn't available on windows.
235             diskbytes = sharebytes
236         so_far_sr = self.state["cycle-to-date"]["space-recovered"]
237         self.increment(so_far_sr, a+"-shares", 1)
238         self.increment(so_far_sr, a+"-sharebytes", sharebytes)
239         self.increment(so_far_sr, a+"-diskbytes", diskbytes)
240         if sharetype:
241             self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
242             self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
243             self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
244
245     def increment_bucketspace(self, a, bucket_diskbytes, sharetype):
246         rec = self.state["cycle-to-date"]["space-recovered"]
247         self.increment(rec, a+"-diskbytes", bucket_diskbytes)
248         self.increment(rec, a+"-buckets", 1)
249         if sharetype:
250             self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes)
251             self.increment(rec, a+"-buckets-"+sharetype, 1)
252
253     def increment(self, d, k, delta=1):
254         if k not in d:
255             d[k] = 0
256         d[k] += delta
257
258     def add_lease_age_to_histogram(self, age):
259         bucket_interval = 24*60*60
260         bucket_number = int(age/bucket_interval)
261         bucket_start = bucket_number * bucket_interval
262         bucket_end = bucket_start + bucket_interval
263         k = (bucket_start, bucket_end)
264         self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1)
265
266     def convert_lease_age_histogram(self, lah):
267         # convert { (minage,maxage) : count } into [ (minage,maxage,count) ]
268         # since the former is not JSON-safe (JSON dictionaries must have
269         # string keys).
270         json_safe_lah = []
271         for k in sorted(lah):
272             (minage,maxage) = k
273             json_safe_lah.append( (minage, maxage, lah[k]) )
274         return json_safe_lah
275
276     def finished_cycle(self, cycle):
277         # add to our history state, prune old history
278         h = {}
279
280         start = self.state["current-cycle-start-time"]
281         now = time.time()
282         h["cycle-start-finish-times"] = (start, now)
283         h["expiration-enabled"] = self.expiration_enabled
284         h["configured-expiration-mode"] = (self.mode,
285                                            self.override_lease_duration,
286                                            self.cutoff_date,
287                                            self.sharetypes_to_expire)
288
289         s = self.state["cycle-to-date"]
290
291         # state["lease-age-histogram"] is a dictionary (mapping
292         # (minage,maxage) tuple to a sharecount), but we report
293         # self.get_state()["lease-age-histogram"] as a list of
294         # (min,max,sharecount) tuples, because JSON can handle that better.
295         # We record the list-of-tuples form into the history for the same
296         # reason.
297         lah = self.convert_lease_age_histogram(s["lease-age-histogram"])
298         h["lease-age-histogram"] = lah
299         h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy()
300         h["corrupt-shares"] = s["corrupt-shares"][:]
301         # note: if ["shares-recovered"] ever acquires an internal dict, this
302         # copy() needs to become a deepcopy
303         h["space-recovered"] = s["space-recovered"].copy()
304
305         history = pickle.load(open(self.historyfile, "rb"))
306         history[cycle] = h
307         while len(history) > 10:
308             oldcycles = sorted(history.keys())
309             del history[oldcycles[0]]
310         f = open(self.historyfile, "wb")
311         pickle.dump(history, f)
312         f.close()
313
314     def get_state(self):
315         """In addition to the crawler state described in
316         ShareCrawler.get_state(), I return the following keys which are
317         specific to the lease-checker/expirer. Note that the non-history keys
318         (with 'cycle' in their names) are only present if a cycle is
319         currently running. If the crawler is between cycles, it appropriate
320         to show the latest item in the 'history' key instead. Also note that
321         each history item has all the data in the 'cycle-to-date' value, plus
322         cycle-start-finish-times.
323
324          cycle-to-date:
325           expiration-enabled
326           configured-expiration-mode
327           lease-age-histogram (list of (minage,maxage,sharecount) tuples)
328           leases-per-share-histogram
329           corrupt-shares (list of (si_b32,shnum) tuples, minimal verification)
330           space-recovered
331
332          estimated-remaining-cycle:
333           # Values may be None if not enough data has been gathered to
334           # produce an estimate.
335           space-recovered
336
337          estimated-current-cycle:
338           # cycle-to-date plus estimated-remaining. Values may be None if
339           # not enough data has been gathered to produce an estimate.
340           space-recovered
341
342          history: maps cyclenum to a dict with the following keys:
343           cycle-start-finish-times
344           expiration-enabled
345           configured-expiration-mode
346           lease-age-histogram
347           leases-per-share-histogram
348           corrupt-shares
349           space-recovered
350
351          The 'space-recovered' structure is a dictionary with the following
352          keys:
353           # 'examined' is what was looked at
354           examined-buckets, examined-buckets-mutable, examined-buckets-immutable
355           examined-shares, -mutable, -immutable
356           examined-sharebytes, -mutable, -immutable
357           examined-diskbytes, -mutable, -immutable
358
359           # 'actual' is what was actually deleted
360           actual-buckets, -mutable, -immutable
361           actual-shares, -mutable, -immutable
362           actual-sharebytes, -mutable, -immutable
363           actual-diskbytes, -mutable, -immutable
364
365           # would have been deleted, if the original lease timer was used
366           original-buckets, -mutable, -immutable
367           original-shares, -mutable, -immutable
368           original-sharebytes, -mutable, -immutable
369           original-diskbytes, -mutable, -immutable
370
371           # would have been deleted, if our configured max_age was used
372           configured-buckets, -mutable, -immutable
373           configured-shares, -mutable, -immutable
374           configured-sharebytes, -mutable, -immutable
375           configured-diskbytes, -mutable, -immutable
376
377         """
378         progress = self.get_progress()
379
380         state = ShareCrawler.get_state(self) # does a shallow copy
381         history = pickle.load(open(self.historyfile, "rb"))
382         state["history"] = history
383
384         if not progress["cycle-in-progress"]:
385             del state["cycle-to-date"]
386             return state
387
388         so_far = state["cycle-to-date"].copy()
389         state["cycle-to-date"] = so_far
390
391         lah = so_far["lease-age-histogram"]
392         so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah)
393         so_far["expiration-enabled"] = self.expiration_enabled
394         so_far["configured-expiration-mode"] = (self.mode,
395                                                 self.override_lease_duration,
396                                                 self.cutoff_date,
397                                                 self.sharetypes_to_expire)
398
399         so_far_sr = so_far["space-recovered"]
400         remaining_sr = {}
401         remaining = {"space-recovered": remaining_sr}
402         cycle_sr = {}
403         cycle = {"space-recovered": cycle_sr}
404
405         if progress["cycle-complete-percentage"] > 0.0:
406             pc = progress["cycle-complete-percentage"] / 100.0
407             m = (1-pc)/pc
408             for a in ("actual", "original", "configured", "examined"):
409                 for b in ("buckets", "shares", "sharebytes", "diskbytes"):
410                     for c in ("", "-mutable", "-immutable"):
411                         k = a+"-"+b+c
412                         remaining_sr[k] = m * so_far_sr[k]
413                         cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
414         else:
415             for a in ("actual", "original", "configured", "examined"):
416                 for b in ("buckets", "shares", "sharebytes", "diskbytes"):
417                     for c in ("", "-mutable", "-immutable"):
418                         k = a+"-"+b+c
419                         remaining_sr[k] = None
420                         cycle_sr[k] = None
421
422         state["estimated-remaining-cycle"] = remaining
423         state["estimated-current-cycle"] = cycle
424         return state