]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_crawler.py
Merge pull request #236 from daira/2725.timezone-test.0
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_crawler.py
1
2 import time
3 import os.path
4 from twisted.trial import unittest
5 from twisted.application import service
6 from twisted.internet import defer
7 from foolscap.api import eventually, fireEventually
8
9 from allmydata.util import fileutil, hashutil, pollmixin
10 from allmydata.storage.server import StorageServer, si_b2a
11 from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
12
13 from allmydata.test.test_storage import FakeCanary
14 from allmydata.test.common_util import StallMixin
15
16 class BucketEnumeratingCrawler(ShareCrawler):
17     cpu_slice = 500 # make sure it can complete in a single slice
18     slow_start = 0
19     def __init__(self, *args, **kwargs):
20         ShareCrawler.__init__(self, *args, **kwargs)
21         self.all_buckets = []
22         self.finished_d = defer.Deferred()
23     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
24         self.all_buckets.append(storage_index_b32)
25     def finished_cycle(self, cycle):
26         eventually(self.finished_d.callback, None)
27
28 class PacedCrawler(ShareCrawler):
29     cpu_slice = 500 # make sure it can complete in a single slice
30     slow_start = 0
31     def __init__(self, *args, **kwargs):
32         ShareCrawler.__init__(self, *args, **kwargs)
33         self.countdown = 6
34         self.all_buckets = []
35         self.finished_d = defer.Deferred()
36         self.yield_cb = None
37     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
38         self.all_buckets.append(storage_index_b32)
39         self.countdown -= 1
40         if self.countdown == 0:
41             # force a timeout. We restore it in yielding()
42             self.cpu_slice = -1.0
43     def yielding(self, sleep_time):
44         self.cpu_slice = 500
45         if self.yield_cb:
46             self.yield_cb()
47     def finished_cycle(self, cycle):
48         eventually(self.finished_d.callback, None)
49
50 class ConsumingCrawler(ShareCrawler):
51     cpu_slice = 0.5
52     allowed_cpu_percentage = 0.5
53     minimum_cycle_time = 0
54     slow_start = 0
55
56     def __init__(self, *args, **kwargs):
57         ShareCrawler.__init__(self, *args, **kwargs)
58         self.accumulated = 0.0
59         self.cycles = 0
60         self.last_yield = 0.0
61     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
62         start = time.time()
63         time.sleep(0.05)
64         elapsed = time.time() - start
65         self.accumulated += elapsed
66         self.last_yield += elapsed
67     def finished_cycle(self, cycle):
68         self.cycles += 1
69     def yielding(self, sleep_time):
70         self.last_yield = 0.0
71
72 class OneShotCrawler(ShareCrawler):
73     cpu_slice = 500 # make sure it can complete in a single slice
74     slow_start = 0
75     def __init__(self, *args, **kwargs):
76         ShareCrawler.__init__(self, *args, **kwargs)
77         self.counter = 0
78         self.finished_d = defer.Deferred()
79     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
80         self.counter += 1
81     def finished_cycle(self, cycle):
82         self.finished_d.callback(None)
83         self.disownServiceParent()
84
85 class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
86     def setUp(self):
87         self.s = service.MultiService()
88         self.s.startService()
89
90     def tearDown(self):
91         return self.s.stopService()
92
93     def si(self, i):
94         return hashutil.storage_index_hash(str(i))
95     def rs(self, i, serverid):
96         return hashutil.bucket_renewal_secret_hash(str(i), serverid)
97     def cs(self, i, serverid):
98         return hashutil.bucket_cancel_secret_hash(str(i), serverid)
99
100     def write(self, i, ss, serverid, tail=0):
101         si = self.si(i)
102         si = si[:-1] + chr(tail)
103         had,made = ss.remote_allocate_buckets(si,
104                                               self.rs(i, serverid),
105                                               self.cs(i, serverid),
106                                               set([0]), 99, FakeCanary())
107         made[0].remote_write(0, "data")
108         made[0].remote_close()
109         return si_b2a(si)
110
111     def test_immediate(self):
112         self.basedir = "crawler/Basic/immediate"
113         fileutil.make_dirs(self.basedir)
114         serverid = "\x00" * 20
115         ss = StorageServer(self.basedir, serverid)
116         ss.setServiceParent(self.s)
117
118         sis = [self.write(i, ss, serverid) for i in range(10)]
119         statefile = os.path.join(self.basedir, "statefile")
120
121         c = BucketEnumeratingCrawler(ss, statefile, allowed_cpu_percentage=.1)
122         c.load_state()
123
124         c.start_current_prefix(time.time())
125         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
126
127         # make sure the statefile has been returned to the starting point
128         c.finished_d = defer.Deferred()
129         c.all_buckets = []
130         c.start_current_prefix(time.time())
131         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
132
133         # check that a new crawler picks up on the state file properly
134         c2 = BucketEnumeratingCrawler(ss, statefile)
135         c2.load_state()
136
137         c2.start_current_prefix(time.time())
138         self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
139
140     def test_service(self):
141         self.basedir = "crawler/Basic/service"
142         fileutil.make_dirs(self.basedir)
143         serverid = "\x00" * 20
144         ss = StorageServer(self.basedir, serverid)
145         ss.setServiceParent(self.s)
146
147         sis = [self.write(i, ss, serverid) for i in range(10)]
148
149         statefile = os.path.join(self.basedir, "statefile")
150         c = BucketEnumeratingCrawler(ss, statefile)
151         c.setServiceParent(self.s)
152
153         # it should be legal to call get_state() and get_progress() right
154         # away, even before the first tick is performed. No work should have
155         # been done yet.
156         s = c.get_state()
157         p = c.get_progress()
158         self.failUnlessEqual(s["last-complete-prefix"], None)
159         self.failUnlessEqual(s["current-cycle"], None)
160         self.failUnlessEqual(p["cycle-in-progress"], False)
161
162         d = c.finished_d
163         def _check(ignored):
164             self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
165         d.addCallback(_check)
166         return d
167
168     def test_paced(self):
169         self.basedir = "crawler/Basic/paced"
170         fileutil.make_dirs(self.basedir)
171         serverid = "\x00" * 20
172         ss = StorageServer(self.basedir, serverid)
173         ss.setServiceParent(self.s)
174
175         # put four buckets in each prefixdir
176         sis = []
177         for i in range(10):
178             for tail in range(4):
179                 sis.append(self.write(i, ss, serverid, tail))
180
181         statefile = os.path.join(self.basedir, "statefile")
182
183         c = PacedCrawler(ss, statefile)
184         c.load_state()
185         try:
186             c.start_current_prefix(time.time())
187         except TimeSliceExceeded:
188             pass
189         # that should stop in the middle of one of the buckets. Since we
190         # aren't using its normal scheduler, we have to save its state
191         # manually.
192         c.save_state()
193         c.cpu_slice = PacedCrawler.cpu_slice
194         self.failUnlessEqual(len(c.all_buckets), 6)
195
196         c.start_current_prefix(time.time()) # finish it
197         self.failUnlessEqual(len(sis), len(c.all_buckets))
198         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
199
200         # make sure the statefile has been returned to the starting point
201         c.finished_d = defer.Deferred()
202         c.all_buckets = []
203         c.start_current_prefix(time.time())
204         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
205         del c
206
207         # start a new crawler, it should start from the beginning
208         c = PacedCrawler(ss, statefile)
209         c.load_state()
210         try:
211             c.start_current_prefix(time.time())
212         except TimeSliceExceeded:
213             pass
214         # that should stop in the middle of one of the buckets. Since we
215         # aren't using its normal scheduler, we have to save its state
216         # manually.
217         c.save_state()
218         c.cpu_slice = PacedCrawler.cpu_slice
219
220         # a third crawler should pick up from where it left off
221         c2 = PacedCrawler(ss, statefile)
222         c2.all_buckets = c.all_buckets[:]
223         c2.load_state()
224         c2.countdown = -1
225         c2.start_current_prefix(time.time())
226         self.failUnlessEqual(len(sis), len(c2.all_buckets))
227         self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
228         del c, c2
229
230         # now stop it at the end of a bucket (countdown=4), to exercise a
231         # different place that checks the time
232         c = PacedCrawler(ss, statefile)
233         c.load_state()
234         c.countdown = 4
235         try:
236             c.start_current_prefix(time.time())
237         except TimeSliceExceeded:
238             pass
239         # that should stop at the end of one of the buckets. Again we must
240         # save state manually.
241         c.save_state()
242         c.cpu_slice = PacedCrawler.cpu_slice
243         self.failUnlessEqual(len(c.all_buckets), 4)
244         c.start_current_prefix(time.time()) # finish it
245         self.failUnlessEqual(len(sis), len(c.all_buckets))
246         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
247         del c
248
249         # stop it again at the end of the bucket, check that a new checker
250         # picks up correctly
251         c = PacedCrawler(ss, statefile)
252         c.load_state()
253         c.countdown = 4
254         try:
255             c.start_current_prefix(time.time())
256         except TimeSliceExceeded:
257             pass
258         # that should stop at the end of one of the buckets.
259         c.save_state()
260
261         c2 = PacedCrawler(ss, statefile)
262         c2.all_buckets = c.all_buckets[:]
263         c2.load_state()
264         c2.countdown = -1
265         c2.start_current_prefix(time.time())
266         self.failUnlessEqual(len(sis), len(c2.all_buckets))
267         self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
268         del c, c2
269
270     def test_paced_service(self):
271         self.basedir = "crawler/Basic/paced_service"
272         fileutil.make_dirs(self.basedir)
273         serverid = "\x00" * 20
274         ss = StorageServer(self.basedir, serverid)
275         ss.setServiceParent(self.s)
276
277         sis = [self.write(i, ss, serverid) for i in range(10)]
278
279         statefile = os.path.join(self.basedir, "statefile")
280         c = PacedCrawler(ss, statefile)
281
282         did_check_progress = [False]
283         def check_progress():
284             c.yield_cb = None
285             try:
286                 p = c.get_progress()
287                 self.failUnlessEqual(p["cycle-in-progress"], True)
288                 pct = p["cycle-complete-percentage"]
289                 # after 6 buckets, we happen to be at 76.17% complete. As
290                 # long as we create shares in deterministic order, this will
291                 # continue to be true.
292                 self.failUnlessEqual(int(pct), 76)
293                 left = p["remaining-sleep-time"]
294                 self.failUnless(isinstance(left, float), left)
295                 self.failUnless(left > 0.0, left)
296             except Exception, e:
297                 did_check_progress[0] = e
298             else:
299                 did_check_progress[0] = True
300         c.yield_cb = check_progress
301
302         c.setServiceParent(self.s)
303         # that should get through 6 buckets, pause for a little while (and
304         # run check_progress()), then resume
305
306         d = c.finished_d
307         def _check(ignored):
308             if did_check_progress[0] is not True:
309                 raise did_check_progress[0]
310             self.failUnless(did_check_progress[0])
311             self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
312             # at this point, the crawler should be sitting in the inter-cycle
313             # timer, which should be pegged at the minumum cycle time
314             self.failUnless(c.timer)
315             self.failUnless(c.sleeping_between_cycles)
316             self.failUnlessEqual(c.current_sleep_time, c.minimum_cycle_time)
317
318             p = c.get_progress()
319             self.failUnlessEqual(p["cycle-in-progress"], False)
320             naptime = p["remaining-wait-time"]
321             self.failUnless(isinstance(naptime, float), naptime)
322             # min-cycle-time is 300, so this is basically testing that it took
323             # less than 290s to crawl
324             self.failUnless(naptime > 10.0, naptime)
325             soon = p["next-crawl-time"] - time.time()
326             self.failUnless(soon > 10.0, soon)
327
328         d.addCallback(_check)
329         return d
330
331     def OFF_test_cpu_usage(self):
332         # this test can't actually assert anything, because too many
333         # buildslave machines are slow. But on a fast developer machine, it
334         # can produce interesting results. So if you care about how well the
335         # Crawler is accomplishing it's run-slowly goals, re-enable this test
336         # and read the stdout when it runs.
337
338         self.basedir = "crawler/Basic/cpu_usage"
339         fileutil.make_dirs(self.basedir)
340         serverid = "\x00" * 20
341         ss = StorageServer(self.basedir, serverid)
342         ss.setServiceParent(self.s)
343
344         for i in range(10):
345             self.write(i, ss, serverid)
346
347         statefile = os.path.join(self.basedir, "statefile")
348         c = ConsumingCrawler(ss, statefile)
349         c.setServiceParent(self.s)
350
351         # this will run as fast as it can, consuming about 50ms per call to
352         # process_bucket(), limited by the Crawler to about 50% cpu. We let
353         # it run for a few seconds, then compare how much time
354         # process_bucket() got vs wallclock time. It should get between 10%
355         # and 70% CPU. This is dicey, there's about 100ms of overhead per
356         # 300ms slice (saving the state file takes about 150-200us, but we do
357         # it 1024 times per cycle, one for each [empty] prefixdir), leaving
358         # 200ms for actual processing, which is enough to get through 4
359         # buckets each slice, then the crawler sleeps for 300ms/0.5 = 600ms,
360         # giving us 900ms wallclock per slice. In 4.0 seconds we can do 4.4
361         # slices, giving us about 17 shares, so we merely assert that we've
362         # finished at least one cycle in that time.
363
364         # with a short cpu_slice (so we can keep this test down to 4
365         # seconds), the overhead is enough to make a nominal 50% usage more
366         # like 30%. Forcing sleep_time to 0 only gets us 67% usage.
367
368         start = time.time()
369         d = self.stall(delay=4.0)
370         def _done(res):
371             elapsed = time.time() - start
372             percent = 100.0 * c.accumulated / elapsed
373             # our buildslaves vary too much in their speeds and load levels,
374             # and many of them only manage to hit 7% usage when our target is
375             # 50%. So don't assert anything about the results, just log them.
376             print
377             print "crawler: got %d%% percent when trying for 50%%" % percent
378             print "crawler: got %d full cycles" % c.cycles
379         d.addCallback(_done)
380         return d
381
382     def test_empty_subclass(self):
383         self.basedir = "crawler/Basic/empty_subclass"
384         fileutil.make_dirs(self.basedir)
385         serverid = "\x00" * 20
386         ss = StorageServer(self.basedir, serverid)
387         ss.setServiceParent(self.s)
388
389         for i in range(10):
390             self.write(i, ss, serverid)
391
392         statefile = os.path.join(self.basedir, "statefile")
393         c = ShareCrawler(ss, statefile)
394         c.slow_start = 0
395         c.setServiceParent(self.s)
396
397         # we just let it run for a while, to get figleaf coverage of the
398         # empty methods in the base class
399
400         def _check():
401             return bool(c.state["last-cycle-finished"] is not None)
402         d = self.poll(_check)
403         def _done(ignored):
404             state = c.get_state()
405             self.failUnless(state["last-cycle-finished"] is not None)
406         d.addCallback(_done)
407         return d
408
409
410     def test_oneshot(self):
411         self.basedir = "crawler/Basic/oneshot"
412         fileutil.make_dirs(self.basedir)
413         serverid = "\x00" * 20
414         ss = StorageServer(self.basedir, serverid)
415         ss.setServiceParent(self.s)
416
417         for i in range(30):
418             self.write(i, ss, serverid)
419
420         statefile = os.path.join(self.basedir, "statefile")
421         c = OneShotCrawler(ss, statefile)
422         c.setServiceParent(self.s)
423
424         d = c.finished_d
425         def _finished_first_cycle(ignored):
426             return fireEventually(c.counter)
427         d.addCallback(_finished_first_cycle)
428         def _check(old_counter):
429             # the crawler should do any work after it's been stopped
430             self.failUnlessEqual(old_counter, c.counter)
431             self.failIf(c.running)
432             self.failIf(c.timer)
433             self.failIf(c.current_sleep_time)
434             s = c.get_state()
435             self.failUnlessEqual(s["last-cycle-finished"], 0)
436             self.failUnlessEqual(s["current-cycle"], None)
437         d.addCallback(_check)
438         return d
439