]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/crawler.py
Failing to load a crawler state pickle uses default values, but the exception clause...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / crawler.py
1
2 import os, time, struct
3 import cPickle as pickle
4 from twisted.internet import reactor
5 from twisted.application import service
6 from allmydata.storage.common import si_b2a
7 from allmydata.util import fileutil
8
9 class TimeSliceExceeded(Exception):
10     pass
11
12 class ShareCrawler(service.MultiService):
13     """A ShareCrawler subclass is attached to a StorageServer, and
14     periodically walks all of its shares, processing each one in some
15     fashion. This crawl is rate-limited, to reduce the IO burden on the host,
16     since large servers can easily have a terabyte of shares, in several
17     million files, which can take hours or days to read.
18
19     Once the crawler starts a cycle, it will proceed at a rate limited by the
20     allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
21     after it has worked for 'cpu_slice' seconds, and not resuming right away,
22     always trying to use less than 'allowed_cpu_percentage'.
23
24     Once the crawler finishes a cycle, it will put off starting the next one
25     long enough to ensure that 'minimum_cycle_time' elapses between the start
26     of two consecutive cycles.
27
28     We assume that the normal upload/download/get_buckets traffic of a tahoe
29     grid will cause the prefixdir contents to be mostly cached in the kernel,
30     or that the number of buckets in each prefixdir will be small enough to
31     load quickly. A 1TB allmydata.com server was measured to have 2.56M
32     buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
33     prefix. On this server, each prefixdir took 130ms-200ms to list the first
34     time, and 17ms to list the second time.
35
36     To use a crawler, create a subclass which implements the process_bucket()
37     method. It will be called with a prefixdir and a base32 storage index
38     string. process_bucket() must run synchronously. Any keys added to
39     self.state will be preserved. Override add_initial_state() to set up
40     initial state keys. Override finished_cycle() to perform additional
41     processing when the cycle is complete. Any status that the crawler
42     produces should be put in the self.state dictionary. Status renderers
43     (like a web page which describes the accomplishments of your crawler)
44     will use crawler.get_state() to retrieve this dictionary; they can
45     present the contents as they see fit.
46
47     Then create an instance, with a reference to a StorageServer and a
48     filename where it can store persistent state. The statefile is used to
49     keep track of how far around the ring the process has travelled, as well
50     as timing history to allow the pace to be predicted and controlled. The
51     statefile will be updated and written to disk after each time slice (just
52     before the crawler yields to the reactor), and also after each cycle is
53     finished, and also when stopService() is called. Note that this means
54     that a crawler which is interrupted with SIGKILL while it is in the
55     middle of a time slice will lose progress: the next time the node is
56     started, the crawler will repeat some unknown amount of work.
57
58     The crawler instance must be started with startService() before it will
59     do any work. To make it stop doing work, call stopService().
60     """
61
62     slow_start = 300 # don't start crawling for 5 minutes after startup
63     # all three of these can be changed at any time
64     allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
65     cpu_slice = 1.0 # use up to 1.0 seconds before yielding
66     minimum_cycle_time = 300 # don't run a cycle faster than this
67
68     def __init__(self, server, statefile, allowed_cpu_percentage=None):
69         service.MultiService.__init__(self)
70         if allowed_cpu_percentage is not None:
71             self.allowed_cpu_percentage = allowed_cpu_percentage
72         self.server = server
73         self.sharedir = server.sharedir
74         self.statefile = statefile
75         self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2]
76                          for i in range(2**10)]
77         self.prefixes.sort()
78         self.timer = None
79         self.bucket_cache = (None, [])
80         self.current_sleep_time = None
81         self.next_wake_time = None
82         self.last_prefix_finished_time = None
83         self.last_prefix_elapsed_time = None
84         self.last_cycle_started_time = None
85         self.last_cycle_elapsed_time = None
86         self.load_state()
87
88     def minus_or_none(self, a, b):
89         if a is None:
90             return None
91         return a-b
92
93     def get_progress(self):
94         """I return information about how much progress the crawler is
95         making. My return value is a dictionary. The primary key is
96         'cycle-in-progress': True if the crawler is currently traversing the
97         shares, False if it is idle between cycles.
98
99         Note that any of these 'time' keys could be None if I am called at
100         certain moments, so application code must be prepared to tolerate
101         this case. The estimates will also be None if insufficient data has
102         been gatherered to form an estimate.
103
104         If cycle-in-progress is True, the following keys will be present::
105
106          cycle-complete-percentage': float, from 0.0 to 100.0, indicating how
107                                      far the crawler has progressed through
108                                      the current cycle
109          remaining-sleep-time: float, seconds from now when we do more work
110          estimated-cycle-complete-time-left:
111                 float, seconds remaining until the current cycle is finished.
112                 TODO: this does not yet include the remaining time left in
113                 the current prefixdir, and it will be very inaccurate on fast
114                 crawlers (which can process a whole prefix in a single tick)
115          estimated-time-per-cycle: float, seconds required to do a complete
116                                    cycle
117
118         If cycle-in-progress is False, the following keys are available::
119
120          next-crawl-time: float, seconds-since-epoch when next crawl starts
121          remaining-wait-time: float, seconds from now when next crawl starts
122          estimated-time-per-cycle: float, seconds required to do a complete
123                                    cycle
124         """
125
126         d = {}
127
128         if self.state["current-cycle"] is None:
129             d["cycle-in-progress"] = False
130             d["next-crawl-time"] = self.next_wake_time
131             d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time,
132                                                           time.time())
133         else:
134             d["cycle-in-progress"] = True
135             pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes)
136             d["cycle-complete-percentage"] = pct
137             remaining = None
138             if self.last_prefix_elapsed_time is not None:
139                 left = len(self.prefixes) - self.last_complete_prefix_index
140                 remaining = left * self.last_prefix_elapsed_time
141                 # TODO: remainder of this prefix: we need to estimate the
142                 # per-bucket time, probably by measuring the time spent on
143                 # this prefix so far, divided by the number of buckets we've
144                 # processed.
145             d["estimated-cycle-complete-time-left"] = remaining
146             # it's possible to call get_progress() from inside a crawler's
147             # finished_prefix() function
148             d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
149                                                            time.time())
150         per_cycle = None
151         if self.last_cycle_elapsed_time is not None:
152             per_cycle = self.last_cycle_elapsed_time
153         elif self.last_prefix_elapsed_time is not None:
154             per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time
155         d["estimated-time-per-cycle"] = per_cycle
156         return d
157
158     def get_state(self):
159         """I return the current state of the crawler. This is a copy of my
160         state dictionary.
161
162         If we are not currently sleeping (i.e. get_state() was called from
163         inside the process_prefixdir, process_bucket, or finished_cycle()
164         methods, or if startService has not yet been called on this crawler),
165         these two keys will be None.
166
167         Subclasses can override this to add computed keys to the return value,
168         but don't forget to start with the upcall.
169         """
170         state = self.state.copy() # it isn't a deepcopy, so don't go crazy
171         return state
172
173     def load_state(self):
174         # we use this to store state for both the crawler's internals and
175         # anything the subclass-specific code needs. The state is stored
176         # after each bucket is processed, after each prefixdir is processed,
177         # and after a cycle is complete. The internal keys we use are:
178         #  ["version"]: int, always 1
179         #  ["last-cycle-finished"]: int, or None if we have not yet finished
180         #                           any cycle
181         #  ["current-cycle"]: int, or None if we are sleeping between cycles
182         #  ["current-cycle-start-time"]: int, seconds-since-epoch of when this
183         #                                cycle was started, possibly by an earlier
184         #                                process
185         #  ["last-complete-prefix"]: str, two-letter name of the last prefixdir
186         #                            that was fully processed, or None if we
187         #                            are sleeping between cycles, or if we
188         #                            have not yet finished any prefixdir since
189         #                            a cycle was started
190         #  ["last-complete-bucket"]: str, base32 storage index bucket name
191         #                            of the last bucket to be processed, or
192         #                            None if we are sleeping between cycles
193         try:
194             f = open(self.statefile, "rb")
195             state = pickle.load(f)
196             f.close()
197         except Exception:
198             state = {"version": 1,
199                      "last-cycle-finished": None,
200                      "current-cycle": None,
201                      "last-complete-prefix": None,
202                      "last-complete-bucket": None,
203                      }
204         state.setdefault("current-cycle-start-time", time.time()) # approximate
205         self.state = state
206         lcp = state["last-complete-prefix"]
207         if lcp == None:
208             self.last_complete_prefix_index = -1
209         else:
210             self.last_complete_prefix_index = self.prefixes.index(lcp)
211         self.add_initial_state()
212
213     def add_initial_state(self):
214         """Hook method to add extra keys to self.state when first loaded.
215
216         The first time this Crawler is used, or when the code has been
217         upgraded, the saved state file may not contain all the keys you
218         expect. Use this method to add any missing keys. Simply modify
219         self.state as needed.
220
221         This method for subclasses to override. No upcall is necessary.
222         """
223         pass
224
225     def save_state(self):
226         lcpi = self.last_complete_prefix_index
227         if lcpi == -1:
228             last_complete_prefix = None
229         else:
230             last_complete_prefix = self.prefixes[lcpi]
231         self.state["last-complete-prefix"] = last_complete_prefix
232         tmpfile = self.statefile + ".tmp"
233         f = open(tmpfile, "wb")
234         pickle.dump(self.state, f)
235         f.close()
236         fileutil.move_into_place(tmpfile, self.statefile)
237
238     def startService(self):
239         # arrange things to look like we were just sleeping, so
240         # status/progress values work correctly
241         self.sleeping_between_cycles = True
242         self.current_sleep_time = self.slow_start
243         self.next_wake_time = time.time() + self.slow_start
244         self.timer = reactor.callLater(self.slow_start, self.start_slice)
245         service.MultiService.startService(self)
246
247     def stopService(self):
248         if self.timer:
249             self.timer.cancel()
250             self.timer = None
251         self.save_state()
252         return service.MultiService.stopService(self)
253
254     def start_slice(self):
255         start_slice = time.time()
256         self.timer = None
257         self.sleeping_between_cycles = False
258         self.current_sleep_time = None
259         self.next_wake_time = None
260         try:
261             self.start_current_prefix(start_slice)
262             finished_cycle = True
263         except TimeSliceExceeded:
264             finished_cycle = False
265         self.save_state()
266         if not self.running:
267             # someone might have used stopService() to shut us down
268             return
269         # either we finished a whole cycle, or we ran out of time
270         now = time.time()
271         this_slice = now - start_slice
272         # this_slice/(this_slice+sleep_time) = percentage
273         # this_slice/percentage = this_slice+sleep_time
274         # sleep_time = (this_slice/percentage) - this_slice
275         sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
276         # if the math gets weird, or a timequake happens, don't sleep
277         # forever. Note that this means that, while a cycle is running, we
278         # will process at least one bucket every 5 minutes, no matter how
279         # long that bucket takes.
280         sleep_time = max(0.0, min(sleep_time, 299))
281         if finished_cycle:
282             # how long should we sleep between cycles? Don't run faster than
283             # allowed_cpu_percentage says, but also run faster than
284             # minimum_cycle_time
285             self.sleeping_between_cycles = True
286             sleep_time = max(sleep_time, self.minimum_cycle_time)
287         else:
288             self.sleeping_between_cycles = False
289         self.current_sleep_time = sleep_time # for status page
290         self.next_wake_time = now + sleep_time
291         self.yielding(sleep_time)
292         self.timer = reactor.callLater(sleep_time, self.start_slice)
293
294     def start_current_prefix(self, start_slice):
295         state = self.state
296         if state["current-cycle"] is None:
297             self.last_cycle_started_time = time.time()
298             state["current-cycle-start-time"] = self.last_cycle_started_time
299             if state["last-cycle-finished"] is None:
300                 state["current-cycle"] = 0
301             else:
302                 state["current-cycle"] = state["last-cycle-finished"] + 1
303             self.started_cycle(state["current-cycle"])
304         cycle = state["current-cycle"]
305
306         for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
307             # if we want to yield earlier, just raise TimeSliceExceeded()
308             prefix = self.prefixes[i]
309             prefixdir = os.path.join(self.sharedir, prefix)
310             if i == self.bucket_cache[0]:
311                 buckets = self.bucket_cache[1]
312             else:
313                 try:
314                     buckets = os.listdir(prefixdir)
315                     buckets.sort()
316                 except EnvironmentError:
317                     buckets = []
318                 self.bucket_cache = (i, buckets)
319             self.process_prefixdir(cycle, prefix, prefixdir,
320                                    buckets, start_slice)
321             self.last_complete_prefix_index = i
322
323             now = time.time()
324             if self.last_prefix_finished_time is not None:
325                 elapsed = now - self.last_prefix_finished_time
326                 self.last_prefix_elapsed_time = elapsed
327             self.last_prefix_finished_time = now
328
329             self.finished_prefix(cycle, prefix)
330             if time.time() >= start_slice + self.cpu_slice:
331                 raise TimeSliceExceeded()
332
333         # yay! we finished the whole cycle
334         self.last_complete_prefix_index = -1
335         self.last_prefix_finished_time = None # don't include the sleep
336         now = time.time()
337         if self.last_cycle_started_time is not None:
338             self.last_cycle_elapsed_time = now - self.last_cycle_started_time
339         state["last-complete-bucket"] = None
340         state["last-cycle-finished"] = cycle
341         state["current-cycle"] = None
342         self.finished_cycle(cycle)
343         self.save_state()
344
345     def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
346         """This gets a list of bucket names (i.e. storage index strings,
347         base32-encoded) in sorted order.
348
349         You can override this if your crawler doesn't care about the actual
350         shares, for example a crawler which merely keeps track of how many
351         buckets are being managed by this server.
352
353         Subclasses which *do* care about actual bucket should leave this
354         method along, and implement process_bucket() instead.
355         """
356
357         for bucket in buckets:
358             if bucket <= self.state["last-complete-bucket"]:
359                 continue
360             self.process_bucket(cycle, prefix, prefixdir, bucket)
361             self.state["last-complete-bucket"] = bucket
362             if time.time() >= start_slice + self.cpu_slice:
363                 raise TimeSliceExceeded()
364
365     # the remaining methods are explictly for subclasses to implement.
366
367     def started_cycle(self, cycle):
368         """Notify a subclass that the crawler is about to start a cycle.
369
370         This method is for subclasses to override. No upcall is necessary.
371         """
372         pass
373
374     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
375         """Examine a single bucket. Subclasses should do whatever they want
376         to do to the shares therein, then update self.state as necessary.
377
378         If the crawler is never interrupted by SIGKILL, this method will be
379         called exactly once per share (per cycle). If it *is* interrupted,
380         then the next time the node is started, some amount of work will be
381         duplicated, according to when self.save_state() was last called. By
382         default, save_state() is called at the end of each timeslice, and
383         after finished_cycle() returns, and when stopService() is called.
384
385         To reduce the chance of duplicate work (i.e. to avoid adding multiple
386         records to a database), you can call save_state() at the end of your
387         process_bucket() method. This will reduce the maximum duplicated work
388         to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
389         per bucket (and some disk writes), which will count against your
390         allowed_cpu_percentage, and which may be considerable if
391         process_bucket() runs quickly.
392
393         This method is for subclasses to override. No upcall is necessary.
394         """
395         pass
396
397     def finished_prefix(self, cycle, prefix):
398         """Notify a subclass that the crawler has just finished processing a
399         prefix directory (all buckets with the same two-character/10bit
400         prefix). To impose a limit on how much work might be duplicated by a
401         SIGKILL that occurs during a timeslice, you can call
402         self.save_state() here, but be aware that it may represent a
403         significant performance hit.
404
405         This method is for subclasses to override. No upcall is necessary.
406         """
407         pass
408
409     def finished_cycle(self, cycle):
410         """Notify subclass that a cycle (one complete traversal of all
411         prefixdirs) has just finished. 'cycle' is the number of the cycle
412         that just finished. This method should perform summary work and
413         update self.state to publish information to status displays.
414
415         One-shot crawlers, such as those used to upgrade shares to a new
416         format or populate a database for the first time, can call
417         self.stopService() (or more likely self.disownServiceParent()) to
418         prevent it from running a second time. Don't forget to set some
419         persistent state so that the upgrader won't be run again the next
420         time the node is started.
421
422         This method is for subclasses to override. No upcall is necessary.
423         """
424         pass
425
426     def yielding(self, sleep_time):
427         """The crawler is about to sleep for 'sleep_time' seconds. This
428         method is mostly for the convenience of unit tests.
429
430         This method is for subclasses to override. No upcall is necessary.
431         """
432         pass
433
434
435 class BucketCountingCrawler(ShareCrawler):
436     """I keep track of how many buckets are being managed by this server.
437     This is equivalent to the number of distributed files and directories for
438     which I am providing storage. The actual number of files+directories in
439     the full grid is probably higher (especially when there are more servers
440     than 'N', the number of generated shares), because some files+directories
441     will have shares on other servers instead of me. Also note that the
442     number of buckets will differ from the number of shares in small grids,
443     when more than one share is placed on a single server.
444     """
445
446     minimum_cycle_time = 60*60 # we don't need this more than once an hour
447
448     def __init__(self, server, statefile, num_sample_prefixes=1):
449         ShareCrawler.__init__(self, server, statefile)
450         self.num_sample_prefixes = num_sample_prefixes
451
452     def add_initial_state(self):
453         # ["bucket-counts"][cyclenum][prefix] = number
454         # ["last-complete-cycle"] = cyclenum # maintained by base class
455         # ["last-complete-bucket-count"] = number
456         # ["storage-index-samples"][prefix] = (cyclenum,
457         #                                      list of SI strings (base32))
458         self.state.setdefault("bucket-counts", {})
459         self.state.setdefault("last-complete-bucket-count", None)
460         self.state.setdefault("storage-index-samples", {})
461
462     def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
463         # we override process_prefixdir() because we don't want to look at
464         # the individual buckets. We'll save state after each one. On my
465         # laptop, a mostly-empty storage server can process about 70
466         # prefixdirs in a 1.0s slice.
467         if cycle not in self.state["bucket-counts"]:
468             self.state["bucket-counts"][cycle] = {}
469         self.state["bucket-counts"][cycle][prefix] = len(buckets)
470         if prefix in self.prefixes[:self.num_sample_prefixes]:
471             self.state["storage-index-samples"][prefix] = (cycle, buckets)
472
473     def finished_cycle(self, cycle):
474         last_counts = self.state["bucket-counts"].get(cycle, [])
475         if len(last_counts) == len(self.prefixes):
476             # great, we have a whole cycle.
477             num_buckets = sum(last_counts.values())
478             self.state["last-complete-bucket-count"] = num_buckets
479             # get rid of old counts
480             for old_cycle in list(self.state["bucket-counts"].keys()):
481                 if old_cycle != cycle:
482                     del self.state["bucket-counts"][old_cycle]
483         # get rid of old samples too
484         for prefix in list(self.state["storage-index-samples"].keys()):
485             old_cycle,buckets = self.state["storage-index-samples"][prefix]
486             if old_cycle != cycle:
487                 del self.state["storage-index-samples"][prefix]
488