]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_crawler.py
test_crawler: disable the percentage-of-cpu-used test, since it is too unreliable...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_crawler.py
1
2 import time
3 import sys
4 import os.path
5 from twisted.trial import unittest
6 from twisted.application import service
7 from twisted.internet import defer
8 from foolscap.eventual import eventually
9
10 from allmydata.util import fileutil, hashutil, pollmixin
11 from allmydata.storage.server import StorageServer, si_b2a
12 from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
13
14 from test_storage import FakeCanary
15 from common_util import StallMixin
16
17 class BucketEnumeratingCrawler(ShareCrawler):
18     cpu_slice = 500 # make sure it can complete in a single slice
19     def __init__(self, server, statefile):
20         ShareCrawler.__init__(self, server, statefile)
21         self.all_buckets = []
22         self.finished_d = defer.Deferred()
23     def process_bucket(self, prefixdir, storage_index_b32):
24         self.all_buckets.append(storage_index_b32)
25     def finished_cycle(self):
26         eventually(self.finished_d.callback, None)
27
28 class PacedCrawler(ShareCrawler):
29     cpu_slice = 500 # make sure it can complete in a single slice
30     def __init__(self, server, statefile):
31         ShareCrawler.__init__(self, server, statefile)
32         self.countdown = 6
33         self.all_buckets = []
34         self.finished_d = defer.Deferred()
35     def process_bucket(self, prefixdir, storage_index_b32):
36         self.all_buckets.append(storage_index_b32)
37         self.countdown -= 1
38         if self.countdown == 0:
39             # force a timeout. We restore it in yielding()
40             self.cpu_slice = -1.0
41     def yielding(self, sleep_time):
42         self.cpu_slice = 500
43     def finished_cycle(self):
44         eventually(self.finished_d.callback, None)
45
46 class ConsumingCrawler(ShareCrawler):
47     cpu_slice = 0.5
48     allowed_cpu_percentage = 0.5
49     minimum_cycle_time = 0
50
51     def __init__(self, server, statefile):
52         ShareCrawler.__init__(self, server, statefile)
53         self.accumulated = 0.0
54         self.cycles = 0
55         self.last_yield = 0.0
56     def process_bucket(self, prefixdir, storage_index_b32):
57         start = time.time()
58         time.sleep(0.05)
59         elapsed = time.time() - start
60         self.accumulated += elapsed
61         self.last_yield += elapsed
62     def finished_cycle(self):
63         self.cycles += 1
64     def yielding(self, sleep_time):
65         self.last_yield = 0.0
66
67 class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
68     def setUp(self):
69         self.s = service.MultiService()
70         self.s.startService()
71
72     def tearDown(self):
73         return self.s.stopService()
74
75     def si(self, i):
76         return hashutil.storage_index_hash(str(i))
77     def rs(self, i, serverid):
78         return hashutil.bucket_renewal_secret_hash(str(i), serverid)
79     def cs(self, i, serverid):
80         return hashutil.bucket_cancel_secret_hash(str(i), serverid)
81
82     def write(self, i, ss, serverid, tail=0):
83         si = self.si(i)
84         si = si[:-1] + chr(tail)
85         had,made = ss.remote_allocate_buckets(si,
86                                               self.rs(i, serverid),
87                                               self.cs(i, serverid),
88                                               set([0]), 99, FakeCanary())
89         made[0].remote_write(0, "data")
90         made[0].remote_close()
91         return si_b2a(si)
92
93     def test_immediate(self):
94         self.basedir = "crawler/Basic/immediate"
95         fileutil.make_dirs(self.basedir)
96         serverid = "\x00" * 20
97         ss = StorageServer(self.basedir, serverid)
98         ss.setServiceParent(self.s)
99
100         sis = [self.write(i, ss, serverid) for i in range(10)]
101         statefile = os.path.join(self.basedir, "statefile")
102
103         c = BucketEnumeratingCrawler(ss, statefile)
104         c.load_state()
105
106         c.start_current_prefix(time.time())
107         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
108
109         # make sure the statefile has been returned to the starting point
110         c.finished_d = defer.Deferred()
111         c.all_buckets = []
112         c.start_current_prefix(time.time())
113         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
114
115         # check that a new crawler picks up on the state file properly
116         c2 = BucketEnumeratingCrawler(ss, statefile)
117         c2.load_state()
118
119         c2.start_current_prefix(time.time())
120         self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
121
122     def test_service(self):
123         self.basedir = "crawler/Basic/service"
124         fileutil.make_dirs(self.basedir)
125         serverid = "\x00" * 20
126         ss = StorageServer(self.basedir, serverid)
127         ss.setServiceParent(self.s)
128
129         sis = [self.write(i, ss, serverid) for i in range(10)]
130
131         statefile = os.path.join(self.basedir, "statefile")
132         c = BucketEnumeratingCrawler(ss, statefile)
133         c.setServiceParent(self.s)
134
135         d = c.finished_d
136         def _check(ignored):
137             self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
138         d.addCallback(_check)
139         return d
140
141     def test_paced(self):
142         self.basedir = "crawler/Basic/paced"
143         fileutil.make_dirs(self.basedir)
144         serverid = "\x00" * 20
145         ss = StorageServer(self.basedir, serverid)
146         ss.setServiceParent(self.s)
147
148         # put four buckets in each prefixdir
149         sis = []
150         for i in range(10):
151             for tail in range(4):
152                 sis.append(self.write(i, ss, serverid, tail))
153
154         statefile = os.path.join(self.basedir, "statefile")
155
156         c = PacedCrawler(ss, statefile)
157         c.load_state()
158         try:
159             c.start_current_prefix(time.time())
160         except TimeSliceExceeded:
161             pass
162         # that should stop in the middle of one of the buckets.
163         c.cpu_slice = PacedCrawler.cpu_slice
164         self.failUnlessEqual(len(c.all_buckets), 6)
165         c.start_current_prefix(time.time()) # finish it
166         self.failUnlessEqual(len(sis), len(c.all_buckets))
167         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
168
169         # make sure the statefile has been returned to the starting point
170         c.finished_d = defer.Deferred()
171         c.all_buckets = []
172         c.start_current_prefix(time.time())
173         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
174         del c
175
176         # start a new crawler, it should start from the beginning
177         c = PacedCrawler(ss, statefile)
178         c.load_state()
179         try:
180             c.start_current_prefix(time.time())
181         except TimeSliceExceeded:
182             pass
183         # that should stop in the middle of one of the buckets
184         c.cpu_slice = PacedCrawler.cpu_slice
185
186         # a third crawler should pick up from where it left off
187         c2 = PacedCrawler(ss, statefile)
188         c2.all_buckets = c.all_buckets[:]
189         c2.load_state()
190         c2.countdown = -1
191         c2.start_current_prefix(time.time())
192         self.failUnlessEqual(len(sis), len(c2.all_buckets))
193         self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
194         del c, c2
195
196         # now stop it at the end of a bucket (countdown=4), to exercise a
197         # different place that checks the time
198         c = PacedCrawler(ss, statefile)
199         c.load_state()
200         c.countdown = 4
201         try:
202             c.start_current_prefix(time.time())
203         except TimeSliceExceeded:
204             pass
205         # that should stop at the end of one of the buckets.
206         c.cpu_slice = PacedCrawler.cpu_slice
207         self.failUnlessEqual(len(c.all_buckets), 4)
208         c.start_current_prefix(time.time()) # finish it
209         self.failUnlessEqual(len(sis), len(c.all_buckets))
210         self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
211         del c
212
213         # stop it again at the end of the bucket, check that a new checker
214         # picks up correctly
215         c = PacedCrawler(ss, statefile)
216         c.load_state()
217         c.countdown = 4
218         try:
219             c.start_current_prefix(time.time())
220         except TimeSliceExceeded:
221             pass
222         # that should stop at the end of one of the buckets.
223
224         c2 = PacedCrawler(ss, statefile)
225         c2.all_buckets = c.all_buckets[:]
226         c2.load_state()
227         c2.countdown = -1
228         c2.start_current_prefix(time.time())
229         self.failUnlessEqual(len(sis), len(c2.all_buckets))
230         self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
231         del c, c2
232
233     def test_paced_service(self):
234         self.basedir = "crawler/Basic/paced_service"
235         fileutil.make_dirs(self.basedir)
236         serverid = "\x00" * 20
237         ss = StorageServer(self.basedir, serverid)
238         ss.setServiceParent(self.s)
239
240         sis = [self.write(i, ss, serverid) for i in range(10)]
241
242         statefile = os.path.join(self.basedir, "statefile")
243         c = PacedCrawler(ss, statefile)
244         c.setServiceParent(self.s)
245         # that should get through 6 buckets, pause for a little while, then
246         # resume
247
248         d = c.finished_d
249         def _check(ignored):
250             self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
251             # at this point, the crawler should be sitting in the inter-cycle
252             # timer, which should be pegged at the minumum cycle time
253             self.failUnless(c.timer)
254             self.failUnless(c.sleeping_between_cycles)
255             self.failUnlessEqual(c.current_sleep_time, c.minimum_cycle_time)
256         d.addCallback(_check)
257         return d
258
259     def OFF_test_cpu_usage(self):
260         # this test can't actually assert anything, because too many
261         # buildslave machines are slow. But on a fast developer machine, it
262         # can produce interesting results. So if you care about how well the
263         # Crawler is accomplishing it's run-slowly goals, re-enable this test
264         # and read the stdout when it runs.
265
266         self.basedir = "crawler/Basic/cpu_usage"
267         fileutil.make_dirs(self.basedir)
268         serverid = "\x00" * 20
269         ss = StorageServer(self.basedir, serverid)
270         ss.setServiceParent(self.s)
271
272         sis = [self.write(i, ss, serverid) for i in range(10)]
273
274         statefile = os.path.join(self.basedir, "statefile")
275         c = ConsumingCrawler(ss, statefile)
276         c.setServiceParent(self.s)
277
278         # this will run as fast as it can, consuming about 50ms per call to
279         # process_bucket(), limited by the Crawler to about 50% cpu. We let
280         # it run for a few seconds, then compare how much time
281         # process_bucket() got vs wallclock time. It should get between 10%
282         # and 70% CPU. This is dicey, there's about 100ms of overhead per
283         # 300ms slice (saving the state file takes about 150-200us, but we do
284         # it 1024 times per cycle, one for each [empty] prefixdir), leaving
285         # 200ms for actual processing, which is enough to get through 4
286         # buckets each slice, then the crawler sleeps for 300ms/0.5 = 600ms,
287         # giving us 900ms wallclock per slice. In 4.0 seconds we can do 4.4
288         # slices, giving us about 17 shares, so we merely assert that we've
289         # finished at least one cycle in that time.
290
291         # with a short cpu_slice (so we can keep this test down to 4
292         # seconds), the overhead is enough to make a nominal 50% usage more
293         # like 30%. Forcing sleep_time to 0 only gets us 67% usage.
294
295         start = time.time()
296         d = self.stall(delay=4.0)
297         def _done(res):
298             elapsed = time.time() - start
299             percent = 100.0 * c.accumulated / elapsed
300             # our buildslaves vary too much in their speeds and load levels,
301             # and many of them only manage to hit 7% usage when our target is
302             # 50%. So don't assert anything about the results, just log them.
303             print
304             print "crawler: got %d%% percent when trying for 50%%" % percent
305             print "crawler: got %d full cycles" % c.cycles
306         d.addCallback(_done)
307         return d
308
309     def test_empty_subclass(self):
310         self.basedir = "crawler/Basic/empty_subclass"
311         fileutil.make_dirs(self.basedir)
312         serverid = "\x00" * 20
313         ss = StorageServer(self.basedir, serverid)
314         ss.setServiceParent(self.s)
315
316         sis = [self.write(i, ss, serverid) for i in range(10)]
317
318         statefile = os.path.join(self.basedir, "statefile")
319         c = ShareCrawler(ss, statefile)
320         c.setServiceParent(self.s)
321
322         # we just let it run for a while, to get figleaf coverage of the
323         # empty methods in the base class
324
325         def _check():
326             return c.first_cycle_finished
327         d = self.poll(_check)
328         return d
329