4 from twisted.trial import unittest
5 from twisted.application import service
6 from twisted.internet import defer
7 from foolscap.api import eventually, fireEventually
9 from allmydata.util import fileutil, hashutil, pollmixin
10 from allmydata.storage.server import StorageServer, si_b2a
11 from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
13 from allmydata.test.test_storage import FakeCanary
14 from allmydata.test.common_util import StallMixin
16 class BucketEnumeratingCrawler(ShareCrawler):
17 cpu_slice = 500 # make sure it can complete in a single slice
19 def __init__(self, *args, **kwargs):
20 ShareCrawler.__init__(self, *args, **kwargs)
22 self.finished_d = defer.Deferred()
23 def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
24 self.all_buckets.append(storage_index_b32)
25 def finished_cycle(self, cycle):
26 eventually(self.finished_d.callback, None)
28 class PacedCrawler(ShareCrawler):
29 cpu_slice = 500 # make sure it can complete in a single slice
31 def __init__(self, *args, **kwargs):
32 ShareCrawler.__init__(self, *args, **kwargs)
35 self.finished_d = defer.Deferred()
37 def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
38 self.all_buckets.append(storage_index_b32)
40 if self.countdown == 0:
41 # force a timeout. We restore it in yielding()
43 def yielding(self, sleep_time):
47 def finished_cycle(self, cycle):
48 eventually(self.finished_d.callback, None)
50 class ConsumingCrawler(ShareCrawler):
52 allowed_cpu_percentage = 0.5
53 minimum_cycle_time = 0
56 def __init__(self, *args, **kwargs):
57 ShareCrawler.__init__(self, *args, **kwargs)
58 self.accumulated = 0.0
61 def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
64 elapsed = time.time() - start
65 self.accumulated += elapsed
66 self.last_yield += elapsed
67 def finished_cycle(self, cycle):
69 def yielding(self, sleep_time):
72 class OneShotCrawler(ShareCrawler):
73 cpu_slice = 500 # make sure it can complete in a single slice
75 def __init__(self, *args, **kwargs):
76 ShareCrawler.__init__(self, *args, **kwargs)
78 self.finished_d = defer.Deferred()
79 def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
81 def finished_cycle(self, cycle):
82 self.finished_d.callback(None)
83 self.disownServiceParent()
85 class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
87 self.s = service.MultiService()
91 return self.s.stopService()
94 return hashutil.storage_index_hash(str(i))
95 def rs(self, i, serverid):
96 return hashutil.bucket_renewal_secret_hash(str(i), serverid)
97 def cs(self, i, serverid):
98 return hashutil.bucket_cancel_secret_hash(str(i), serverid)
100 def write(self, i, ss, serverid, tail=0):
102 si = si[:-1] + chr(tail)
103 had,made = ss.remote_allocate_buckets(si,
104 self.rs(i, serverid),
105 self.cs(i, serverid),
106 set([0]), 99, FakeCanary())
107 made[0].remote_write(0, "data")
108 made[0].remote_close()
111 def test_immediate(self):
112 self.basedir = "crawler/Basic/immediate"
113 fileutil.make_dirs(self.basedir)
114 serverid = "\x00" * 20
115 ss = StorageServer(self.basedir, serverid)
116 ss.setServiceParent(self.s)
118 sis = [self.write(i, ss, serverid) for i in range(10)]
119 statefile = os.path.join(self.basedir, "statefile")
121 c = BucketEnumeratingCrawler(ss, statefile, allowed_cpu_percentage=.1)
124 c.start_current_prefix(time.time())
125 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
127 # make sure the statefile has been returned to the starting point
128 c.finished_d = defer.Deferred()
130 c.start_current_prefix(time.time())
131 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
133 # check that a new crawler picks up on the state file properly
134 c2 = BucketEnumeratingCrawler(ss, statefile)
137 c2.start_current_prefix(time.time())
138 self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
140 def test_service(self):
141 self.basedir = "crawler/Basic/service"
142 fileutil.make_dirs(self.basedir)
143 serverid = "\x00" * 20
144 ss = StorageServer(self.basedir, serverid)
145 ss.setServiceParent(self.s)
147 sis = [self.write(i, ss, serverid) for i in range(10)]
149 statefile = os.path.join(self.basedir, "statefile")
150 c = BucketEnumeratingCrawler(ss, statefile)
151 c.setServiceParent(self.s)
153 # it should be legal to call get_state() and get_progress() right
154 # away, even before the first tick is performed. No work should have
158 self.failUnlessEqual(s["last-complete-prefix"], None)
159 self.failUnlessEqual(s["current-cycle"], None)
160 self.failUnlessEqual(p["cycle-in-progress"], False)
164 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
165 d.addCallback(_check)
168 def test_paced(self):
169 self.basedir = "crawler/Basic/paced"
170 fileutil.make_dirs(self.basedir)
171 serverid = "\x00" * 20
172 ss = StorageServer(self.basedir, serverid)
173 ss.setServiceParent(self.s)
175 # put four buckets in each prefixdir
178 for tail in range(4):
179 sis.append(self.write(i, ss, serverid, tail))
181 statefile = os.path.join(self.basedir, "statefile")
183 c = PacedCrawler(ss, statefile)
186 c.start_current_prefix(time.time())
187 except TimeSliceExceeded:
189 # that should stop in the middle of one of the buckets. Since we
190 # aren't using its normal scheduler, we have to save its state
193 c.cpu_slice = PacedCrawler.cpu_slice
194 self.failUnlessEqual(len(c.all_buckets), 6)
196 c.start_current_prefix(time.time()) # finish it
197 self.failUnlessEqual(len(sis), len(c.all_buckets))
198 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
200 # make sure the statefile has been returned to the starting point
201 c.finished_d = defer.Deferred()
203 c.start_current_prefix(time.time())
204 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
207 # start a new crawler, it should start from the beginning
208 c = PacedCrawler(ss, statefile)
211 c.start_current_prefix(time.time())
212 except TimeSliceExceeded:
214 # that should stop in the middle of one of the buckets. Since we
215 # aren't using its normal scheduler, we have to save its state
218 c.cpu_slice = PacedCrawler.cpu_slice
220 # a third crawler should pick up from where it left off
221 c2 = PacedCrawler(ss, statefile)
222 c2.all_buckets = c.all_buckets[:]
225 c2.start_current_prefix(time.time())
226 self.failUnlessEqual(len(sis), len(c2.all_buckets))
227 self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
230 # now stop it at the end of a bucket (countdown=4), to exercise a
231 # different place that checks the time
232 c = PacedCrawler(ss, statefile)
236 c.start_current_prefix(time.time())
237 except TimeSliceExceeded:
239 # that should stop at the end of one of the buckets. Again we must
240 # save state manually.
242 c.cpu_slice = PacedCrawler.cpu_slice
243 self.failUnlessEqual(len(c.all_buckets), 4)
244 c.start_current_prefix(time.time()) # finish it
245 self.failUnlessEqual(len(sis), len(c.all_buckets))
246 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
249 # stop it again at the end of the bucket, check that a new checker
251 c = PacedCrawler(ss, statefile)
255 c.start_current_prefix(time.time())
256 except TimeSliceExceeded:
258 # that should stop at the end of one of the buckets.
261 c2 = PacedCrawler(ss, statefile)
262 c2.all_buckets = c.all_buckets[:]
265 c2.start_current_prefix(time.time())
266 self.failUnlessEqual(len(sis), len(c2.all_buckets))
267 self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
270 def test_paced_service(self):
271 self.basedir = "crawler/Basic/paced_service"
272 fileutil.make_dirs(self.basedir)
273 serverid = "\x00" * 20
274 ss = StorageServer(self.basedir, serverid)
275 ss.setServiceParent(self.s)
277 sis = [self.write(i, ss, serverid) for i in range(10)]
279 statefile = os.path.join(self.basedir, "statefile")
280 c = PacedCrawler(ss, statefile)
282 did_check_progress = [False]
283 def check_progress():
287 self.failUnlessEqual(p["cycle-in-progress"], True)
288 pct = p["cycle-complete-percentage"]
289 # after 6 buckets, we happen to be at 76.17% complete. As
290 # long as we create shares in deterministic order, this will
291 # continue to be true.
292 self.failUnlessEqual(int(pct), 76)
293 left = p["remaining-sleep-time"]
294 self.failUnless(isinstance(left, float), left)
295 self.failUnless(left > 0.0, left)
297 did_check_progress[0] = e
299 did_check_progress[0] = True
300 c.yield_cb = check_progress
302 c.setServiceParent(self.s)
303 # that should get through 6 buckets, pause for a little while (and
304 # run check_progress()), then resume
308 if did_check_progress[0] is not True:
309 raise did_check_progress[0]
310 self.failUnless(did_check_progress[0])
311 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
312 # at this point, the crawler should be sitting in the inter-cycle
313 # timer, which should be pegged at the minumum cycle time
314 self.failUnless(c.timer)
315 self.failUnless(c.sleeping_between_cycles)
316 self.failUnlessEqual(c.current_sleep_time, c.minimum_cycle_time)
319 self.failUnlessEqual(p["cycle-in-progress"], False)
320 naptime = p["remaining-wait-time"]
321 self.failUnless(isinstance(naptime, float), naptime)
322 # min-cycle-time is 300, so this is basically testing that it took
323 # less than 290s to crawl
324 self.failUnless(naptime > 10.0, naptime)
325 soon = p["next-crawl-time"] - time.time()
326 self.failUnless(soon > 10.0, soon)
328 d.addCallback(_check)
331 def OFF_test_cpu_usage(self):
332 # this test can't actually assert anything, because too many
333 # buildslave machines are slow. But on a fast developer machine, it
334 # can produce interesting results. So if you care about how well the
335 # Crawler is accomplishing it's run-slowly goals, re-enable this test
336 # and read the stdout when it runs.
338 self.basedir = "crawler/Basic/cpu_usage"
339 fileutil.make_dirs(self.basedir)
340 serverid = "\x00" * 20
341 ss = StorageServer(self.basedir, serverid)
342 ss.setServiceParent(self.s)
345 self.write(i, ss, serverid)
347 statefile = os.path.join(self.basedir, "statefile")
348 c = ConsumingCrawler(ss, statefile)
349 c.setServiceParent(self.s)
351 # this will run as fast as it can, consuming about 50ms per call to
352 # process_bucket(), limited by the Crawler to about 50% cpu. We let
353 # it run for a few seconds, then compare how much time
354 # process_bucket() got vs wallclock time. It should get between 10%
355 # and 70% CPU. This is dicey, there's about 100ms of overhead per
356 # 300ms slice (saving the state file takes about 150-200us, but we do
357 # it 1024 times per cycle, one for each [empty] prefixdir), leaving
358 # 200ms for actual processing, which is enough to get through 4
359 # buckets each slice, then the crawler sleeps for 300ms/0.5 = 600ms,
360 # giving us 900ms wallclock per slice. In 4.0 seconds we can do 4.4
361 # slices, giving us about 17 shares, so we merely assert that we've
362 # finished at least one cycle in that time.
364 # with a short cpu_slice (so we can keep this test down to 4
365 # seconds), the overhead is enough to make a nominal 50% usage more
366 # like 30%. Forcing sleep_time to 0 only gets us 67% usage.
369 d = self.stall(delay=4.0)
371 elapsed = time.time() - start
372 percent = 100.0 * c.accumulated / elapsed
373 # our buildslaves vary too much in their speeds and load levels,
374 # and many of them only manage to hit 7% usage when our target is
375 # 50%. So don't assert anything about the results, just log them.
377 print "crawler: got %d%% percent when trying for 50%%" % percent
378 print "crawler: got %d full cycles" % c.cycles
382 def test_empty_subclass(self):
383 self.basedir = "crawler/Basic/empty_subclass"
384 fileutil.make_dirs(self.basedir)
385 serverid = "\x00" * 20
386 ss = StorageServer(self.basedir, serverid)
387 ss.setServiceParent(self.s)
390 self.write(i, ss, serverid)
392 statefile = os.path.join(self.basedir, "statefile")
393 c = ShareCrawler(ss, statefile)
395 c.setServiceParent(self.s)
397 # we just let it run for a while, to get figleaf coverage of the
398 # empty methods in the base class
401 return bool(c.state["last-cycle-finished"] is not None)
402 d = self.poll(_check)
404 state = c.get_state()
405 self.failUnless(state["last-cycle-finished"] is not None)
410 def test_oneshot(self):
411 self.basedir = "crawler/Basic/oneshot"
412 fileutil.make_dirs(self.basedir)
413 serverid = "\x00" * 20
414 ss = StorageServer(self.basedir, serverid)
415 ss.setServiceParent(self.s)
418 self.write(i, ss, serverid)
420 statefile = os.path.join(self.basedir, "statefile")
421 c = OneShotCrawler(ss, statefile)
422 c.setServiceParent(self.s)
425 def _finished_first_cycle(ignored):
426 return fireEventually(c.counter)
427 d.addCallback(_finished_first_cycle)
428 def _check(old_counter):
429 # the crawler should do any work after it's been stopped
430 self.failUnlessEqual(old_counter, c.counter)
431 self.failIf(c.running)
433 self.failIf(c.current_sleep_time)
435 self.failUnlessEqual(s["last-cycle-finished"], 0)
436 self.failUnlessEqual(s["current-cycle"], None)
437 d.addCallback(_check)