2 import os, time, struct
3 import cPickle as pickle
4 from twisted.internet import reactor
5 from twisted.application import service
6 from allmydata.storage.common import si_b2a
7 from allmydata.util import fileutil
9 class TimeSliceExceeded(Exception):
12 class ShareCrawler(service.MultiService):
13 """A ShareCrawler subclass is attached to a StorageServer, and
14 periodically walks all of its shares, processing each one in some
15 fashion. This crawl is rate-limited, to reduce the IO burden on the host,
16 since large servers can easily have a terabyte of shares, in several
17 million files, which can take hours or days to read.
19 Once the crawler starts a cycle, it will proceed at a rate limited by the
20 allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
21 after it has worked for 'cpu_slice' seconds, and not resuming right away,
22 always trying to use less than 'allowed_cpu_percentage'.
24 Once the crawler finishes a cycle, it will put off starting the next one
25 long enough to ensure that 'minimum_cycle_time' elapses between the start
26 of two consecutive cycles.
28 We assume that the normal upload/download/get_buckets traffic of a tahoe
29 grid will cause the prefixdir contents to be mostly cached in the kernel,
30 or that the number of buckets in each prefixdir will be small enough to
31 load quickly. A 1TB allmydata.com server was measured to have 2.56M
32 buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
33 prefix. On this server, each prefixdir took 130ms-200ms to list the first
34 time, and 17ms to list the second time.
36 To use a crawler, create a subclass which implements the process_bucket()
37 method. It will be called with a prefixdir and a base32 storage index
38 string. process_bucket() must run synchronously. Any keys added to
39 self.state will be preserved. Override add_initial_state() to set up
40 initial state keys. Override finished_cycle() to perform additional
41 processing when the cycle is complete. Any status that the crawler
42 produces should be put in the self.state dictionary. Status renderers
43 (like a web page which describes the accomplishments of your crawler)
44 will use crawler.get_state() to retrieve this dictionary; they can
45 present the contents as they see fit.
47 Then create an instance, with a reference to a StorageServer and a
48 filename where it can store persistent state. The statefile is used to
49 keep track of how far around the ring the process has travelled, as well
50 as timing history to allow the pace to be predicted and controlled. The
51 statefile will be updated and written to disk after each time slice (just
52 before the crawler yields to the reactor), and also after each cycle is
53 finished, and also when stopService() is called. Note that this means
54 that a crawler which is interrupted with SIGKILL while it is in the
55 middle of a time slice will lose progress: the next time the node is
56 started, the crawler will repeat some unknown amount of work.
58 The crawler instance must be started with startService() before it will
59 do any work. To make it stop doing work, call stopService().
62 slow_start = 300 # don't start crawling for 5 minutes after startup
63 # all three of these can be changed at any time
64 allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
65 cpu_slice = 1.0 # use up to 1.0 seconds before yielding
66 minimum_cycle_time = 300 # don't run a cycle faster than this
68 def __init__(self, server, statefile, allowed_cpu_percentage=None):
69 service.MultiService.__init__(self)
70 if allowed_cpu_percentage is not None:
71 self.allowed_cpu_percentage = allowed_cpu_percentage
73 self.sharedir = server.sharedir
74 self.statefile = statefile
75 self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2]
76 for i in range(2**10)]
79 self.bucket_cache = (None, [])
80 self.current_sleep_time = None
81 self.next_wake_time = None
82 self.last_prefix_finished_time = None
83 self.last_prefix_elapsed_time = None
84 self.last_cycle_started_time = None
85 self.last_cycle_elapsed_time = None
88 def minus_or_none(self, a, b):
93 def get_progress(self):
94 """I return information about how much progress the crawler is
95 making. My return value is a dictionary. The primary key is
96 'cycle-in-progress': True if the crawler is currently traversing the
97 shares, False if it is idle between cycles.
99 Note that any of these 'time' keys could be None if I am called at
100 certain moments, so application code must be prepared to tolerate
101 this case. The estimates will also be None if insufficient data has
102 been gatherered to form an estimate.
104 If cycle-in-progress is True, the following keys will be present::
106 cycle-complete-percentage': float, from 0.0 to 100.0, indicating how
107 far the crawler has progressed through
109 remaining-sleep-time: float, seconds from now when we do more work
110 estimated-cycle-complete-time-left:
111 float, seconds remaining until the current cycle is finished.
112 TODO: this does not yet include the remaining time left in
113 the current prefixdir, and it will be very inaccurate on fast
114 crawlers (which can process a whole prefix in a single tick)
115 estimated-time-per-cycle: float, seconds required to do a complete
118 If cycle-in-progress is False, the following keys are available::
120 next-crawl-time: float, seconds-since-epoch when next crawl starts
121 remaining-wait-time: float, seconds from now when next crawl starts
122 estimated-time-per-cycle: float, seconds required to do a complete
128 if self.state["current-cycle"] is None:
129 d["cycle-in-progress"] = False
130 d["next-crawl-time"] = self.next_wake_time
131 d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time,
134 d["cycle-in-progress"] = True
135 pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes)
136 d["cycle-complete-percentage"] = pct
138 if self.last_prefix_elapsed_time is not None:
139 left = len(self.prefixes) - self.last_complete_prefix_index
140 remaining = left * self.last_prefix_elapsed_time
141 # TODO: remainder of this prefix: we need to estimate the
142 # per-bucket time, probably by measuring the time spent on
143 # this prefix so far, divided by the number of buckets we've
145 d["estimated-cycle-complete-time-left"] = remaining
146 # it's possible to call get_progress() from inside a crawler's
147 # finished_prefix() function
148 d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
151 if self.last_cycle_elapsed_time is not None:
152 per_cycle = self.last_cycle_elapsed_time
153 elif self.last_prefix_elapsed_time is not None:
154 per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time
155 d["estimated-time-per-cycle"] = per_cycle
159 """I return the current state of the crawler. This is a copy of my
162 If we are not currently sleeping (i.e. get_state() was called from
163 inside the process_prefixdir, process_bucket, or finished_cycle()
164 methods, or if startService has not yet been called on this crawler),
165 these two keys will be None.
167 Subclasses can override this to add computed keys to the return value,
168 but don't forget to start with the upcall.
170 state = self.state.copy() # it isn't a deepcopy, so don't go crazy
173 def load_state(self):
174 # we use this to store state for both the crawler's internals and
175 # anything the subclass-specific code needs. The state is stored
176 # after each bucket is processed, after each prefixdir is processed,
177 # and after a cycle is complete. The internal keys we use are:
178 # ["version"]: int, always 1
179 # ["last-cycle-finished"]: int, or None if we have not yet finished
181 # ["current-cycle"]: int, or None if we are sleeping between cycles
182 # ["current-cycle-start-time"]: int, seconds-since-epoch of when this
183 # cycle was started, possibly by an earlier
185 # ["last-complete-prefix"]: str, two-letter name of the last prefixdir
186 # that was fully processed, or None if we
187 # are sleeping between cycles, or if we
188 # have not yet finished any prefixdir since
189 # a cycle was started
190 # ["last-complete-bucket"]: str, base32 storage index bucket name
191 # of the last bucket to be processed, or
192 # None if we are sleeping between cycles
194 f = open(self.statefile, "rb")
195 state = pickle.load(f)
198 state = {"version": 1,
199 "last-cycle-finished": None,
200 "current-cycle": None,
201 "last-complete-prefix": None,
202 "last-complete-bucket": None,
204 state.setdefault("current-cycle-start-time", time.time()) # approximate
206 lcp = state["last-complete-prefix"]
208 self.last_complete_prefix_index = -1
210 self.last_complete_prefix_index = self.prefixes.index(lcp)
211 self.add_initial_state()
213 def add_initial_state(self):
214 """Hook method to add extra keys to self.state when first loaded.
216 The first time this Crawler is used, or when the code has been
217 upgraded, the saved state file may not contain all the keys you
218 expect. Use this method to add any missing keys. Simply modify
219 self.state as needed.
221 This method for subclasses to override. No upcall is necessary.
225 def save_state(self):
226 lcpi = self.last_complete_prefix_index
228 last_complete_prefix = None
230 last_complete_prefix = self.prefixes[lcpi]
231 self.state["last-complete-prefix"] = last_complete_prefix
232 tmpfile = self.statefile + ".tmp"
233 f = open(tmpfile, "wb")
234 pickle.dump(self.state, f)
236 fileutil.move_into_place(tmpfile, self.statefile)
238 def startService(self):
239 # arrange things to look like we were just sleeping, so
240 # status/progress values work correctly
241 self.sleeping_between_cycles = True
242 self.current_sleep_time = self.slow_start
243 self.next_wake_time = time.time() + self.slow_start
244 self.timer = reactor.callLater(self.slow_start, self.start_slice)
245 service.MultiService.startService(self)
247 def stopService(self):
252 return service.MultiService.stopService(self)
254 def start_slice(self):
255 start_slice = time.time()
257 self.sleeping_between_cycles = False
258 self.current_sleep_time = None
259 self.next_wake_time = None
261 self.start_current_prefix(start_slice)
262 finished_cycle = True
263 except TimeSliceExceeded:
264 finished_cycle = False
267 # someone might have used stopService() to shut us down
269 # either we finished a whole cycle, or we ran out of time
271 this_slice = now - start_slice
272 # this_slice/(this_slice+sleep_time) = percentage
273 # this_slice/percentage = this_slice+sleep_time
274 # sleep_time = (this_slice/percentage) - this_slice
275 sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
276 # if the math gets weird, or a timequake happens, don't sleep
277 # forever. Note that this means that, while a cycle is running, we
278 # will process at least one bucket every 5 minutes, no matter how
279 # long that bucket takes.
280 sleep_time = max(0.0, min(sleep_time, 299))
282 # how long should we sleep between cycles? Don't run faster than
283 # allowed_cpu_percentage says, but also run faster than
285 self.sleeping_between_cycles = True
286 sleep_time = max(sleep_time, self.minimum_cycle_time)
288 self.sleeping_between_cycles = False
289 self.current_sleep_time = sleep_time # for status page
290 self.next_wake_time = now + sleep_time
291 self.yielding(sleep_time)
292 self.timer = reactor.callLater(sleep_time, self.start_slice)
294 def start_current_prefix(self, start_slice):
296 if state["current-cycle"] is None:
297 self.last_cycle_started_time = time.time()
298 state["current-cycle-start-time"] = self.last_cycle_started_time
299 if state["last-cycle-finished"] is None:
300 state["current-cycle"] = 0
302 state["current-cycle"] = state["last-cycle-finished"] + 1
303 self.started_cycle(state["current-cycle"])
304 cycle = state["current-cycle"]
306 for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
307 # if we want to yield earlier, just raise TimeSliceExceeded()
308 prefix = self.prefixes[i]
309 prefixdir = os.path.join(self.sharedir, prefix)
310 if i == self.bucket_cache[0]:
311 buckets = self.bucket_cache[1]
314 buckets = os.listdir(prefixdir)
316 except EnvironmentError:
318 self.bucket_cache = (i, buckets)
319 self.process_prefixdir(cycle, prefix, prefixdir,
320 buckets, start_slice)
321 self.last_complete_prefix_index = i
324 if self.last_prefix_finished_time is not None:
325 elapsed = now - self.last_prefix_finished_time
326 self.last_prefix_elapsed_time = elapsed
327 self.last_prefix_finished_time = now
329 self.finished_prefix(cycle, prefix)
330 if time.time() >= start_slice + self.cpu_slice:
331 raise TimeSliceExceeded()
333 # yay! we finished the whole cycle
334 self.last_complete_prefix_index = -1
335 self.last_prefix_finished_time = None # don't include the sleep
337 if self.last_cycle_started_time is not None:
338 self.last_cycle_elapsed_time = now - self.last_cycle_started_time
339 state["last-complete-bucket"] = None
340 state["last-cycle-finished"] = cycle
341 state["current-cycle"] = None
342 self.finished_cycle(cycle)
345 def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
346 """This gets a list of bucket names (i.e. storage index strings,
347 base32-encoded) in sorted order.
349 You can override this if your crawler doesn't care about the actual
350 shares, for example a crawler which merely keeps track of how many
351 buckets are being managed by this server.
353 Subclasses which *do* care about actual bucket should leave this
354 method along, and implement process_bucket() instead.
357 for bucket in buckets:
358 if bucket <= self.state["last-complete-bucket"]:
360 self.process_bucket(cycle, prefix, prefixdir, bucket)
361 self.state["last-complete-bucket"] = bucket
362 if time.time() >= start_slice + self.cpu_slice:
363 raise TimeSliceExceeded()
365 # the remaining methods are explictly for subclasses to implement.
367 def started_cycle(self, cycle):
368 """Notify a subclass that the crawler is about to start a cycle.
370 This method is for subclasses to override. No upcall is necessary.
374 def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
375 """Examine a single bucket. Subclasses should do whatever they want
376 to do to the shares therein, then update self.state as necessary.
378 If the crawler is never interrupted by SIGKILL, this method will be
379 called exactly once per share (per cycle). If it *is* interrupted,
380 then the next time the node is started, some amount of work will be
381 duplicated, according to when self.save_state() was last called. By
382 default, save_state() is called at the end of each timeslice, and
383 after finished_cycle() returns, and when stopService() is called.
385 To reduce the chance of duplicate work (i.e. to avoid adding multiple
386 records to a database), you can call save_state() at the end of your
387 process_bucket() method. This will reduce the maximum duplicated work
388 to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
389 per bucket (and some disk writes), which will count against your
390 allowed_cpu_percentage, and which may be considerable if
391 process_bucket() runs quickly.
393 This method is for subclasses to override. No upcall is necessary.
397 def finished_prefix(self, cycle, prefix):
398 """Notify a subclass that the crawler has just finished processing a
399 prefix directory (all buckets with the same two-character/10bit
400 prefix). To impose a limit on how much work might be duplicated by a
401 SIGKILL that occurs during a timeslice, you can call
402 self.save_state() here, but be aware that it may represent a
403 significant performance hit.
405 This method is for subclasses to override. No upcall is necessary.
409 def finished_cycle(self, cycle):
410 """Notify subclass that a cycle (one complete traversal of all
411 prefixdirs) has just finished. 'cycle' is the number of the cycle
412 that just finished. This method should perform summary work and
413 update self.state to publish information to status displays.
415 One-shot crawlers, such as those used to upgrade shares to a new
416 format or populate a database for the first time, can call
417 self.stopService() (or more likely self.disownServiceParent()) to
418 prevent it from running a second time. Don't forget to set some
419 persistent state so that the upgrader won't be run again the next
420 time the node is started.
422 This method is for subclasses to override. No upcall is necessary.
426 def yielding(self, sleep_time):
427 """The crawler is about to sleep for 'sleep_time' seconds. This
428 method is mostly for the convenience of unit tests.
430 This method is for subclasses to override. No upcall is necessary.
435 class BucketCountingCrawler(ShareCrawler):
436 """I keep track of how many buckets are being managed by this server.
437 This is equivalent to the number of distributed files and directories for
438 which I am providing storage. The actual number of files+directories in
439 the full grid is probably higher (especially when there are more servers
440 than 'N', the number of generated shares), because some files+directories
441 will have shares on other servers instead of me. Also note that the
442 number of buckets will differ from the number of shares in small grids,
443 when more than one share is placed on a single server.
446 minimum_cycle_time = 60*60 # we don't need this more than once an hour
448 def __init__(self, server, statefile, num_sample_prefixes=1):
449 ShareCrawler.__init__(self, server, statefile)
450 self.num_sample_prefixes = num_sample_prefixes
452 def add_initial_state(self):
453 # ["bucket-counts"][cyclenum][prefix] = number
454 # ["last-complete-cycle"] = cyclenum # maintained by base class
455 # ["last-complete-bucket-count"] = number
456 # ["storage-index-samples"][prefix] = (cyclenum,
457 # list of SI strings (base32))
458 self.state.setdefault("bucket-counts", {})
459 self.state.setdefault("last-complete-bucket-count", None)
460 self.state.setdefault("storage-index-samples", {})
462 def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
463 # we override process_prefixdir() because we don't want to look at
464 # the individual buckets. We'll save state after each one. On my
465 # laptop, a mostly-empty storage server can process about 70
466 # prefixdirs in a 1.0s slice.
467 if cycle not in self.state["bucket-counts"]:
468 self.state["bucket-counts"][cycle] = {}
469 self.state["bucket-counts"][cycle][prefix] = len(buckets)
470 if prefix in self.prefixes[:self.num_sample_prefixes]:
471 self.state["storage-index-samples"][prefix] = (cycle, buckets)
473 def finished_cycle(self, cycle):
474 last_counts = self.state["bucket-counts"].get(cycle, [])
475 if len(last_counts) == len(self.prefixes):
476 # great, we have a whole cycle.
477 num_buckets = sum(last_counts.values())
478 self.state["last-complete-bucket-count"] = num_buckets
479 # get rid of old counts
480 for old_cycle in list(self.state["bucket-counts"].keys()):
481 if old_cycle != cycle:
482 del self.state["bucket-counts"][old_cycle]
483 # get rid of old samples too
484 for prefix in list(self.state["storage-index-samples"].keys()):
485 old_cycle,buckets = self.state["storage-index-samples"][prefix]
486 if old_cycle != cycle:
487 del self.state["storage-index-samples"][prefix]