5 from twisted.trial import unittest
6 from twisted.application import service
7 from twisted.internet import defer
8 from foolscap.eventual import eventually
10 from allmydata.util import fileutil, hashutil, pollmixin
11 from allmydata.storage.server import StorageServer, si_b2a
12 from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
14 from test_storage import FakeCanary
15 from common_util import StallMixin
17 class BucketEnumeratingCrawler(ShareCrawler):
18 cpu_slice = 500 # make sure it can complete in a single slice
19 def __init__(self, server, statefile):
20 ShareCrawler.__init__(self, server, statefile)
22 self.finished_d = defer.Deferred()
23 def process_bucket(self, prefixdir, storage_index_b32):
24 self.all_buckets.append(storage_index_b32)
25 def finished_cycle(self):
26 eventually(self.finished_d.callback, None)
28 class PacedCrawler(ShareCrawler):
29 cpu_slice = 500 # make sure it can complete in a single slice
30 def __init__(self, server, statefile):
31 ShareCrawler.__init__(self, server, statefile)
34 self.finished_d = defer.Deferred()
35 def process_bucket(self, prefixdir, storage_index_b32):
36 self.all_buckets.append(storage_index_b32)
38 if self.countdown == 0:
39 # force a timeout. We restore it in yielding()
41 def yielding(self, sleep_time):
43 def finished_cycle(self):
44 eventually(self.finished_d.callback, None)
46 class ConsumingCrawler(ShareCrawler):
48 allowed_cpu_percentage = 0.5
49 minimum_cycle_time = 0
51 def __init__(self, server, statefile):
52 ShareCrawler.__init__(self, server, statefile)
53 self.accumulated = 0.0
56 def process_bucket(self, prefixdir, storage_index_b32):
59 elapsed = time.time() - start
60 self.accumulated += elapsed
61 self.last_yield += elapsed
62 def finished_cycle(self):
64 def yielding(self, sleep_time):
67 class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
69 self.s = service.MultiService()
73 return self.s.stopService()
76 return hashutil.storage_index_hash(str(i))
77 def rs(self, i, serverid):
78 return hashutil.bucket_renewal_secret_hash(str(i), serverid)
79 def cs(self, i, serverid):
80 return hashutil.bucket_cancel_secret_hash(str(i), serverid)
82 def write(self, i, ss, serverid, tail=0):
84 si = si[:-1] + chr(tail)
85 had,made = ss.remote_allocate_buckets(si,
88 set([0]), 99, FakeCanary())
89 made[0].remote_write(0, "data")
90 made[0].remote_close()
93 def test_immediate(self):
94 self.basedir = "crawler/Basic/immediate"
95 fileutil.make_dirs(self.basedir)
96 serverid = "\x00" * 20
97 ss = StorageServer(self.basedir, serverid)
98 ss.setServiceParent(self.s)
100 sis = [self.write(i, ss, serverid) for i in range(10)]
101 statefile = os.path.join(self.basedir, "statefile")
103 c = BucketEnumeratingCrawler(ss, statefile)
106 c.start_current_prefix(time.time())
107 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
109 # make sure the statefile has been returned to the starting point
110 c.finished_d = defer.Deferred()
112 c.start_current_prefix(time.time())
113 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
115 # check that a new crawler picks up on the state file properly
116 c2 = BucketEnumeratingCrawler(ss, statefile)
119 c2.start_current_prefix(time.time())
120 self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
122 def test_service(self):
123 self.basedir = "crawler/Basic/service"
124 fileutil.make_dirs(self.basedir)
125 serverid = "\x00" * 20
126 ss = StorageServer(self.basedir, serverid)
127 ss.setServiceParent(self.s)
129 sis = [self.write(i, ss, serverid) for i in range(10)]
131 statefile = os.path.join(self.basedir, "statefile")
132 c = BucketEnumeratingCrawler(ss, statefile)
133 c.setServiceParent(self.s)
137 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
138 d.addCallback(_check)
141 def test_paced(self):
142 self.basedir = "crawler/Basic/paced"
143 fileutil.make_dirs(self.basedir)
144 serverid = "\x00" * 20
145 ss = StorageServer(self.basedir, serverid)
146 ss.setServiceParent(self.s)
148 # put four buckets in each prefixdir
151 for tail in range(4):
152 sis.append(self.write(i, ss, serverid, tail))
154 statefile = os.path.join(self.basedir, "statefile")
156 c = PacedCrawler(ss, statefile)
159 c.start_current_prefix(time.time())
160 except TimeSliceExceeded:
162 # that should stop in the middle of one of the buckets.
163 c.cpu_slice = PacedCrawler.cpu_slice
164 self.failUnlessEqual(len(c.all_buckets), 6)
165 c.start_current_prefix(time.time()) # finish it
166 self.failUnlessEqual(len(sis), len(c.all_buckets))
167 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
169 # make sure the statefile has been returned to the starting point
170 c.finished_d = defer.Deferred()
172 c.start_current_prefix(time.time())
173 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
176 # start a new crawler, it should start from the beginning
177 c = PacedCrawler(ss, statefile)
180 c.start_current_prefix(time.time())
181 except TimeSliceExceeded:
183 # that should stop in the middle of one of the buckets
184 c.cpu_slice = PacedCrawler.cpu_slice
186 # a third crawler should pick up from where it left off
187 c2 = PacedCrawler(ss, statefile)
188 c2.all_buckets = c.all_buckets[:]
191 c2.start_current_prefix(time.time())
192 self.failUnlessEqual(len(sis), len(c2.all_buckets))
193 self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
196 # now stop it at the end of a bucket (countdown=4), to exercise a
197 # different place that checks the time
198 c = PacedCrawler(ss, statefile)
202 c.start_current_prefix(time.time())
203 except TimeSliceExceeded:
205 # that should stop at the end of one of the buckets.
206 c.cpu_slice = PacedCrawler.cpu_slice
207 self.failUnlessEqual(len(c.all_buckets), 4)
208 c.start_current_prefix(time.time()) # finish it
209 self.failUnlessEqual(len(sis), len(c.all_buckets))
210 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
213 # stop it again at the end of the bucket, check that a new checker
215 c = PacedCrawler(ss, statefile)
219 c.start_current_prefix(time.time())
220 except TimeSliceExceeded:
222 # that should stop at the end of one of the buckets.
224 c2 = PacedCrawler(ss, statefile)
225 c2.all_buckets = c.all_buckets[:]
228 c2.start_current_prefix(time.time())
229 self.failUnlessEqual(len(sis), len(c2.all_buckets))
230 self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
233 def test_paced_service(self):
234 self.basedir = "crawler/Basic/paced_service"
235 fileutil.make_dirs(self.basedir)
236 serverid = "\x00" * 20
237 ss = StorageServer(self.basedir, serverid)
238 ss.setServiceParent(self.s)
240 sis = [self.write(i, ss, serverid) for i in range(10)]
242 statefile = os.path.join(self.basedir, "statefile")
243 c = PacedCrawler(ss, statefile)
244 c.setServiceParent(self.s)
245 # that should get through 6 buckets, pause for a little while, then
250 self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
251 # at this point, the crawler should be sitting in the inter-cycle
252 # timer, which should be pegged at the minumum cycle time
253 self.failUnless(c.timer)
254 self.failUnless(c.sleeping_between_cycles)
255 self.failUnlessEqual(c.current_sleep_time, c.minimum_cycle_time)
256 d.addCallback(_check)
259 def OFF_test_cpu_usage(self):
260 # this test can't actually assert anything, because too many
261 # buildslave machines are slow. But on a fast developer machine, it
262 # can produce interesting results. So if you care about how well the
263 # Crawler is accomplishing it's run-slowly goals, re-enable this test
264 # and read the stdout when it runs.
266 self.basedir = "crawler/Basic/cpu_usage"
267 fileutil.make_dirs(self.basedir)
268 serverid = "\x00" * 20
269 ss = StorageServer(self.basedir, serverid)
270 ss.setServiceParent(self.s)
272 sis = [self.write(i, ss, serverid) for i in range(10)]
274 statefile = os.path.join(self.basedir, "statefile")
275 c = ConsumingCrawler(ss, statefile)
276 c.setServiceParent(self.s)
278 # this will run as fast as it can, consuming about 50ms per call to
279 # process_bucket(), limited by the Crawler to about 50% cpu. We let
280 # it run for a few seconds, then compare how much time
281 # process_bucket() got vs wallclock time. It should get between 10%
282 # and 70% CPU. This is dicey, there's about 100ms of overhead per
283 # 300ms slice (saving the state file takes about 150-200us, but we do
284 # it 1024 times per cycle, one for each [empty] prefixdir), leaving
285 # 200ms for actual processing, which is enough to get through 4
286 # buckets each slice, then the crawler sleeps for 300ms/0.5 = 600ms,
287 # giving us 900ms wallclock per slice. In 4.0 seconds we can do 4.4
288 # slices, giving us about 17 shares, so we merely assert that we've
289 # finished at least one cycle in that time.
291 # with a short cpu_slice (so we can keep this test down to 4
292 # seconds), the overhead is enough to make a nominal 50% usage more
293 # like 30%. Forcing sleep_time to 0 only gets us 67% usage.
296 d = self.stall(delay=4.0)
298 elapsed = time.time() - start
299 percent = 100.0 * c.accumulated / elapsed
300 # our buildslaves vary too much in their speeds and load levels,
301 # and many of them only manage to hit 7% usage when our target is
302 # 50%. So don't assert anything about the results, just log them.
304 print "crawler: got %d%% percent when trying for 50%%" % percent
305 print "crawler: got %d full cycles" % c.cycles
309 def test_empty_subclass(self):
310 self.basedir = "crawler/Basic/empty_subclass"
311 fileutil.make_dirs(self.basedir)
312 serverid = "\x00" * 20
313 ss = StorageServer(self.basedir, serverid)
314 ss.setServiceParent(self.s)
316 sis = [self.write(i, ss, serverid) for i in range(10)]
318 statefile = os.path.join(self.basedir, "statefile")
319 c = ShareCrawler(ss, statefile)
320 c.setServiceParent(self.s)
322 # we just let it run for a while, to get figleaf coverage of the
323 # empty methods in the base class
326 return c.first_cycle_finished
327 d = self.poll(_check)