2 import time, os.path, stat, re, simplejson
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16 UnknownMutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
30 def __init__(self, ignore_disconnectors=False):
31 self.ignore = ignore_disconnectors
32 self.disconnectors = {}
33 def notifyOnDisconnect(self, f, *args, **kwargs):
37 self.disconnectors[m] = (f, args, kwargs)
39 def dontNotifyOnDisconnect(self, marker):
42 del self.disconnectors[marker]
44 class FakeStatsProvider:
45 def count(self, name, delta=1):
47 def register_producer(self, producer):
50 class Bucket(unittest.TestCase):
51 def make_workdir(self, name):
52 basedir = os.path.join("storage", "Bucket", name)
53 incoming = os.path.join(basedir, "tmp", "bucket")
54 final = os.path.join(basedir, "bucket")
55 fileutil.make_dirs(basedir)
56 fileutil.make_dirs(os.path.join(basedir, "tmp"))
57 return incoming, final
59 def bucket_writer_closed(self, bw, consumed):
61 def add_latency(self, category, latency):
63 def count(self, name, delta=1):
68 renew_secret = os.urandom(32)
69 cancel_secret = os.urandom(32)
70 expiration_time = time.time() + 5000
71 return LeaseInfo(owner_num, renew_secret, cancel_secret,
72 expiration_time, "\x00" * 20)
74 def test_create(self):
75 incoming, final = self.make_workdir("test_create")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*25)
81 bw.remote_write(75, "d"*7)
84 def test_readwrite(self):
85 incoming, final = self.make_workdir("test_readwrite")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*7) # last block may be short
94 br = BucketReader(self, bw.finalhome)
95 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101 def callRemote(self, methname, *args, **kwargs):
103 meth = getattr(self.target, "remote_" + methname)
104 return meth(*args, **kwargs)
105 return defer.maybeDeferred(_call)
107 class BucketProxy(unittest.TestCase):
108 def make_bucket(self, name, size):
109 basedir = os.path.join("storage", "BucketProxy", name)
110 incoming = os.path.join(basedir, "tmp", "bucket")
111 final = os.path.join(basedir, "bucket")
112 fileutil.make_dirs(basedir)
113 fileutil.make_dirs(os.path.join(basedir, "tmp"))
114 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
120 def make_lease(self):
122 renew_secret = os.urandom(32)
123 cancel_secret = os.urandom(32)
124 expiration_time = time.time() + 5000
125 return LeaseInfo(owner_num, renew_secret, cancel_secret,
126 expiration_time, "\x00" * 20)
128 def bucket_writer_closed(self, bw, consumed):
130 def add_latency(self, category, latency):
132 def count(self, name, delta=1):
135 def test_create(self):
136 bw, rb, sharefname = self.make_bucket("test_create", 500)
137 bp = WriteBucketProxy(rb,
142 uri_extension_size_max=500, nodeid=None)
143 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
145 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146 # Let's pretend each share has 100 bytes of data, and that there are
147 # 4 segments (25 bytes each), and 8 shares total. So the two
148 # per-segment merkle trees (crypttext_hash_tree,
149 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152 # long. That should make the whole share:
154 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165 uri_extension = "s" + "E"*498 + "e"
167 bw, rb, sharefname = self.make_bucket(name, sharesize)
173 uri_extension_size_max=len(uri_extension),
177 d.addCallback(lambda res: bp.put_block(0, "a"*25))
178 d.addCallback(lambda res: bp.put_block(1, "b"*25))
179 d.addCallback(lambda res: bp.put_block(2, "c"*25))
180 d.addCallback(lambda res: bp.put_block(3, "d"*20))
181 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185 d.addCallback(lambda res: bp.close())
187 # now read everything back
188 def _start_reading(res):
189 br = BucketReader(self, sharefname)
192 rbp = rbp_class(rb, peerid="abc", storage_index="")
193 self.failUnless("to peer" in repr(rbp))
194 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
196 d1 = rbp.get_block_data(0, 25, 25)
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206 d1.addCallback(lambda res:
207 self.failUnlessEqual(res, crypttext_hashes))
208 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210 d1.addCallback(lambda res: rbp.get_share_hashes())
211 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212 d1.addCallback(lambda res: rbp.get_uri_extension())
213 d1.addCallback(lambda res:
214 self.failUnlessEqual(res, uri_extension))
218 d.addCallback(_start_reading)
222 def test_readwrite_v1(self):
223 return self._do_test_readwrite("test_readwrite_v1",
224 0x24, WriteBucketProxy, ReadBucketProxy)
226 def test_readwrite_v2(self):
227 return self._do_test_readwrite("test_readwrite_v2",
228 0x44, WriteBucketProxy_v2, ReadBucketProxy)
230 class FakeDiskStorageServer(StorageServer):
231 def stat_disk(self, d):
232 return self.DISKAVAIL
234 class Server(unittest.TestCase):
237 self.sparent = LoggingServiceParent()
238 self.sparent.startService()
239 self._lease_secret = itertools.count()
241 return self.sparent.stopService()
243 def workdir(self, name):
244 basedir = os.path.join("storage", "Server", name)
247 def create(self, name, reserved_space=0, klass=StorageServer):
248 workdir = self.workdir(name)
249 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
250 stats_provider=FakeStatsProvider())
251 ss.setServiceParent(self.sparent)
254 def test_create(self):
255 ss = self.create("test_create")
257 def allocate(self, ss, storage_index, sharenums, size, canary=None):
258 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
261 canary = FakeCanary()
262 return ss.remote_allocate_buckets(storage_index,
263 renew_secret, cancel_secret,
264 sharenums, size, canary)
266 def test_large_share(self):
267 ss = self.create("test_large_share")
269 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
270 self.failUnlessEqual(already, set())
271 self.failUnlessEqual(set(writers.keys()), set([0]))
273 shnum, bucket = writers.items()[0]
274 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
275 bucket.remote_write(2**32, "ab")
276 bucket.remote_close()
278 readers = ss.remote_get_buckets("allocate")
279 reader = readers[shnum]
280 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
281 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
283 def test_dont_overfill_dirs(self):
285 This test asserts that if you add a second share whose storage index
286 share lots of leading bits with an extant share (but isn't the exact
287 same storage index), this won't add an entry to the share directory.
289 ss = self.create("test_dont_overfill_dirs")
290 already, writers = self.allocate(ss, "storageindex", [0], 10)
291 for i, wb in writers.items():
292 wb.remote_write(0, "%10d" % i)
294 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296 children_of_storedir = set(os.listdir(storedir))
298 # Now store another one under another storageindex that has leading
299 # chars the same as the first storageindex.
300 already, writers = self.allocate(ss, "storageindey", [0], 10)
301 for i, wb in writers.items():
302 wb.remote_write(0, "%10d" % i)
304 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
306 new_children_of_storedir = set(os.listdir(storedir))
307 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
309 def test_remove_incoming(self):
310 ss = self.create("test_remove_incoming")
311 already, writers = self.allocate(ss, "vid", range(3), 10)
312 for i,wb in writers.items():
313 wb.remote_write(0, "%10d" % i)
315 incoming_share_dir = wb.incominghome
316 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
317 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
318 incoming_dir = os.path.dirname(incoming_prefix_dir)
319 self.failIf(os.path.exists(incoming_bucket_dir))
320 self.failIf(os.path.exists(incoming_prefix_dir))
321 self.failUnless(os.path.exists(incoming_dir))
323 def test_allocate(self):
324 ss = self.create("test_allocate")
326 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
328 canary = FakeCanary()
329 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330 self.failUnlessEqual(already, set())
331 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
333 # while the buckets are open, they should not count as readable
334 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
337 for i,wb in writers.items():
338 wb.remote_write(0, "%25d" % i)
340 # aborting a bucket that was already closed is a no-op
343 # now they should be readable
344 b = ss.remote_get_buckets("allocate")
345 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
348 # now if we ask about writing again, the server should offer those
349 # three buckets as already present. It should offer them even if we
350 # don't ask about those specific ones.
351 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
352 self.failUnlessEqual(already, set([0,1,2]))
353 self.failUnlessEqual(set(writers.keys()), set([3,4]))
355 # while those two buckets are open for writing, the server should
356 # refuse to offer them to uploaders
358 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
359 self.failUnlessEqual(already2, set([0,1,2]))
360 self.failUnlessEqual(set(writers2.keys()), set([5]))
362 # aborting the writes should remove the tempfiles
363 for i,wb in writers2.items():
365 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
366 self.failUnlessEqual(already2, set([0,1,2]))
367 self.failUnlessEqual(set(writers2.keys()), set([5]))
369 for i,wb in writers2.items():
371 for i,wb in writers.items():
374 def test_disconnect(self):
375 # simulate a disconnection
376 ss = self.create("test_disconnect")
377 canary = FakeCanary()
378 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
379 self.failUnlessEqual(already, set())
380 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
381 for (f,args,kwargs) in canary.disconnectors.values():
386 # that ought to delete the incoming shares
387 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
388 self.failUnlessEqual(already, set())
389 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
391 def test_reserved_space(self):
392 ss = self.create("test_reserved_space", reserved_space=10000,
393 klass=FakeDiskStorageServer)
394 # the FakeDiskStorageServer doesn't do real statvfs() calls
396 # 15k available, 10k reserved, leaves 5k for shares
398 # a newly created and filled share incurs this much overhead, beyond
399 # the size we request.
401 LEASE_SIZE = 4+32+32+4
402 canary = FakeCanary(True)
403 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
404 self.failUnlessEqual(len(writers), 3)
405 # now the StorageServer should have 3000 bytes provisionally
406 # allocated, allowing only 2000 more to be claimed
407 self.failUnlessEqual(len(ss._active_writers), 3)
409 # allocating 1001-byte shares only leaves room for one
410 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
411 self.failUnlessEqual(len(writers2), 1)
412 self.failUnlessEqual(len(ss._active_writers), 4)
414 # we abandon the first set, so their provisional allocation should be
418 self.failUnlessEqual(len(ss._active_writers), 1)
419 # now we have a provisional allocation of 1001 bytes
421 # and we close the second set, so their provisional allocation should
422 # become real, long-term allocation, and grows to include the
424 for bw in writers2.values():
425 bw.remote_write(0, "a"*25)
430 self.failUnlessEqual(len(ss._active_writers), 0)
432 allocated = 1001 + OVERHEAD + LEASE_SIZE
434 # we have to manually increase DISKAVAIL, since we're not doing real
436 ss.DISKAVAIL -= allocated
438 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
439 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
440 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
441 self.failUnlessEqual(len(writers3), 39)
442 self.failUnlessEqual(len(ss._active_writers), 39)
446 self.failUnlessEqual(len(ss._active_writers), 0)
447 ss.disownServiceParent()
451 basedir = self.workdir("test_seek_behavior")
452 fileutil.make_dirs(basedir)
453 filename = os.path.join(basedir, "testfile")
454 f = open(filename, "wb")
457 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
458 # files. mode="a" preserves previous contents but does not allow
459 # seeking-to-create-holes. mode="r+" allows both.
460 f = open(filename, "rb+")
464 filelen = os.stat(filename)[stat.ST_SIZE]
465 self.failUnlessEqual(filelen, 100+3)
466 f2 = open(filename, "rb")
467 self.failUnlessEqual(f2.read(5), "start")
470 def test_leases(self):
471 ss = self.create("test_leases")
472 canary = FakeCanary()
476 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
477 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
478 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
479 sharenums, size, canary)
480 self.failUnlessEqual(len(already), 0)
481 self.failUnlessEqual(len(writers), 5)
482 for wb in writers.values():
485 leases = list(ss.get_leases("si0"))
486 self.failUnlessEqual(len(leases), 1)
487 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
489 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
490 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
491 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
492 sharenums, size, canary)
493 for wb in writers.values():
496 # take out a second lease on si1
497 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
498 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
499 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
500 sharenums, size, canary)
501 self.failUnlessEqual(len(already), 5)
502 self.failUnlessEqual(len(writers), 0)
504 leases = list(ss.get_leases("si1"))
505 self.failUnlessEqual(len(leases), 2)
506 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
508 # and a third lease, using add-lease
509 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
510 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
511 ss.remote_add_lease("si1", rs2a, cs2a)
512 leases = list(ss.get_leases("si1"))
513 self.failUnlessEqual(len(leases), 3)
514 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
516 # add-lease on a missing storage index is silently ignored
517 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
519 # check that si0 is readable
520 readers = ss.remote_get_buckets("si0")
521 self.failUnlessEqual(len(readers), 5)
523 # renew the first lease. Only the proper renew_secret should work
524 ss.remote_renew_lease("si0", rs0)
525 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
526 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
528 # check that si0 is still readable
529 readers = ss.remote_get_buckets("si0")
530 self.failUnlessEqual(len(readers), 5)
533 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
534 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
535 ss.remote_cancel_lease("si0", cs0)
537 # si0 should now be gone
538 readers = ss.remote_get_buckets("si0")
539 self.failUnlessEqual(len(readers), 0)
540 # and the renew should no longer work
541 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
544 # cancel the first lease on si1, leaving the second and third in place
545 ss.remote_cancel_lease("si1", cs1)
546 readers = ss.remote_get_buckets("si1")
547 self.failUnlessEqual(len(readers), 5)
548 # the corresponding renew should no longer work
549 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
551 leases = list(ss.get_leases("si1"))
552 self.failUnlessEqual(len(leases), 2)
553 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
555 ss.remote_renew_lease("si1", rs2)
556 # cancelling the second and third should make it go away
557 ss.remote_cancel_lease("si1", cs2)
558 ss.remote_cancel_lease("si1", cs2a)
559 readers = ss.remote_get_buckets("si1")
560 self.failUnlessEqual(len(readers), 0)
561 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
562 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
563 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
565 leases = list(ss.get_leases("si1"))
566 self.failUnlessEqual(len(leases), 0)
569 # test overlapping uploads
570 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
571 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
572 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
573 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
574 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
575 sharenums, size, canary)
576 self.failUnlessEqual(len(already), 0)
577 self.failUnlessEqual(len(writers), 5)
578 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
579 sharenums, size, canary)
580 self.failUnlessEqual(len(already2), 0)
581 self.failUnlessEqual(len(writers2), 0)
582 for wb in writers.values():
585 leases = list(ss.get_leases("si3"))
586 self.failUnlessEqual(len(leases), 1)
588 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
589 sharenums, size, canary)
590 self.failUnlessEqual(len(already3), 5)
591 self.failUnlessEqual(len(writers3), 0)
593 leases = list(ss.get_leases("si3"))
594 self.failUnlessEqual(len(leases), 2)
596 def test_readonly(self):
597 workdir = self.workdir("test_readonly")
598 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
599 ss.setServiceParent(self.sparent)
601 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
602 self.failUnlessEqual(already, set())
603 self.failUnlessEqual(writers, {})
605 stats = ss.get_stats()
606 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
608 if "storage_server.disk_avail" in stats:
609 # windows does not have os.statvfs, so it doesn't give us disk
610 # stats. But if there are stats, readonly_storage means
612 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
614 def test_discard(self):
615 # discard is really only used for other tests, but we test it anyways
616 workdir = self.workdir("test_discard")
617 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
618 ss.setServiceParent(self.sparent)
620 canary = FakeCanary()
621 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
622 self.failUnlessEqual(already, set())
623 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
624 for i,wb in writers.items():
625 wb.remote_write(0, "%25d" % i)
627 # since we discard the data, the shares should be present but sparse.
628 # Since we write with some seeks, the data we read back will be all
630 b = ss.remote_get_buckets("vid")
631 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
632 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
634 def test_advise_corruption(self):
635 workdir = self.workdir("test_advise_corruption")
636 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
637 ss.setServiceParent(self.sparent)
639 si0_s = base32.b2a("si0")
640 ss.remote_advise_corrupt_share("immutable", "si0", 0,
641 "This share smells funny.\n")
642 reportdir = os.path.join(workdir, "corruption-advisories")
643 reports = os.listdir(reportdir)
644 self.failUnlessEqual(len(reports), 1)
645 report_si0 = reports[0]
646 self.failUnless(si0_s in report_si0, report_si0)
647 f = open(os.path.join(reportdir, report_si0), "r")
650 self.failUnless("type: immutable" in report)
651 self.failUnless(("storage_index: %s" % si0_s) in report)
652 self.failUnless("share_number: 0" in report)
653 self.failUnless("This share smells funny." in report)
655 # test the RIBucketWriter version too
656 si1_s = base32.b2a("si1")
657 already,writers = self.allocate(ss, "si1", [1], 75)
658 self.failUnlessEqual(already, set())
659 self.failUnlessEqual(set(writers.keys()), set([1]))
660 writers[1].remote_write(0, "data")
661 writers[1].remote_close()
663 b = ss.remote_get_buckets("si1")
664 self.failUnlessEqual(set(b.keys()), set([1]))
665 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
667 reports = os.listdir(reportdir)
668 self.failUnlessEqual(len(reports), 2)
669 report_si1 = [r for r in reports if si1_s in r][0]
670 f = open(os.path.join(reportdir, report_si1), "r")
673 self.failUnless("type: immutable" in report)
674 self.failUnless(("storage_index: %s" % si1_s) in report)
675 self.failUnless("share_number: 1" in report)
676 self.failUnless("This share tastes like dust." in report)
680 class MutableServer(unittest.TestCase):
683 self.sparent = LoggingServiceParent()
684 self._lease_secret = itertools.count()
686 return self.sparent.stopService()
688 def workdir(self, name):
689 basedir = os.path.join("storage", "MutableServer", name)
692 def create(self, name):
693 workdir = self.workdir(name)
694 ss = StorageServer(workdir, "\x00" * 20)
695 ss.setServiceParent(self.sparent)
698 def test_create(self):
699 ss = self.create("test_create")
701 def write_enabler(self, we_tag):
702 return hashutil.tagged_hash("we_blah", we_tag)
704 def renew_secret(self, tag):
705 return hashutil.tagged_hash("renew_blah", str(tag))
707 def cancel_secret(self, tag):
708 return hashutil.tagged_hash("cancel_blah", str(tag))
710 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
711 write_enabler = self.write_enabler(we_tag)
712 renew_secret = self.renew_secret(lease_tag)
713 cancel_secret = self.cancel_secret(lease_tag)
714 rstaraw = ss.remote_slot_testv_and_readv_and_writev
715 testandwritev = dict( [ (shnum, ([], [], None) )
716 for shnum in sharenums ] )
718 rc = rstaraw(storage_index,
719 (write_enabler, renew_secret, cancel_secret),
722 (did_write, readv_data) = rc
723 self.failUnless(did_write)
724 self.failUnless(isinstance(readv_data, dict))
725 self.failUnlessEqual(len(readv_data), 0)
727 def test_bad_magic(self):
728 ss = self.create("test_bad_magic")
729 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
730 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
735 read = ss.remote_slot_readv
736 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
737 read, "si1", [0], [(0,10)])
738 self.failUnless(" had magic " in str(e), e)
739 self.failUnless(" but we wanted " in str(e), e)
741 def test_container_size(self):
742 ss = self.create("test_container_size")
743 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
745 read = ss.remote_slot_readv
746 rstaraw = ss.remote_slot_testv_and_readv_and_writev
747 secrets = ( self.write_enabler("we1"),
748 self.renew_secret("we1"),
749 self.cancel_secret("we1") )
750 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
751 answer = rstaraw("si1", secrets,
752 {0: ([], [(0,data)], len(data)+12)},
754 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
756 # trying to make the container too large will raise an exception
757 TOOBIG = MutableShareFile.MAX_SIZE + 10
758 self.failUnlessRaises(DataTooLargeError,
759 rstaraw, "si1", secrets,
760 {0: ([], [(0,data)], TOOBIG)},
763 # it should be possible to make the container smaller, although at
764 # the moment this doesn't actually affect the share, unless the
765 # container size is dropped to zero, in which case the share is
767 answer = rstaraw("si1", secrets,
768 {0: ([], [(0,data)], len(data)+8)},
770 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
772 answer = rstaraw("si1", secrets,
773 {0: ([], [(0,data)], 0)},
775 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
777 read_answer = read("si1", [0], [(0,10)])
778 self.failUnlessEqual(read_answer, {})
780 def test_allocate(self):
781 ss = self.create("test_allocate")
782 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
785 read = ss.remote_slot_readv
786 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
788 self.failUnlessEqual(read("si1", [], [(0, 10)]),
789 {0: [""], 1: [""], 2: [""]})
790 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
794 secrets = ( self.write_enabler("we1"),
795 self.renew_secret("we1"),
796 self.cancel_secret("we1") )
797 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
798 write = ss.remote_slot_testv_and_readv_and_writev
799 answer = write("si1", secrets,
800 {0: ([], [(0,data)], None)},
802 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
804 self.failUnlessEqual(read("si1", [0], [(0,20)]),
805 {0: ["00000000001111111111"]})
806 self.failUnlessEqual(read("si1", [0], [(95,10)]),
808 #self.failUnlessEqual(s0.remote_get_length(), 100)
810 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
811 f = self.failUnlessRaises(BadWriteEnablerError,
812 write, "si1", bad_secrets,
814 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
816 # this testv should fail
817 answer = write("si1", secrets,
818 {0: ([(0, 12, "eq", "444444444444"),
819 (20, 5, "eq", "22222"),
826 self.failUnlessEqual(answer, (False,
827 {0: ["000000000011", "22222"],
831 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
834 answer = write("si1", secrets,
835 {0: ([(10, 5, "lt", "11111"),
842 self.failUnlessEqual(answer, (False,
847 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
850 def test_operators(self):
851 # test operators, the data we're comparing is '11111' in all cases.
852 # test both fail+pass, reset data after each one.
853 ss = self.create("test_operators")
855 secrets = ( self.write_enabler("we1"),
856 self.renew_secret("we1"),
857 self.cancel_secret("we1") )
858 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
859 write = ss.remote_slot_testv_and_readv_and_writev
860 read = ss.remote_slot_readv
863 write("si1", secrets,
864 {0: ([], [(0,data)], None)},
870 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
875 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
876 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
877 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
880 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
885 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
886 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
889 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
894 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
895 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
899 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
904 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
905 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
908 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
913 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
914 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
917 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
922 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
923 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
927 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
932 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
933 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
936 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
941 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
942 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
946 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
951 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
952 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
955 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
960 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
961 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
965 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
970 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
971 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
974 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
979 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
980 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
983 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
988 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
989 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
993 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
998 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
999 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1002 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1007 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1008 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1011 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1016 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1017 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1020 # finally, test some operators against empty shares
1021 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1026 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1027 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1030 def test_readv(self):
1031 ss = self.create("test_readv")
1032 secrets = ( self.write_enabler("we1"),
1033 self.renew_secret("we1"),
1034 self.cancel_secret("we1") )
1035 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1036 write = ss.remote_slot_testv_and_readv_and_writev
1037 read = ss.remote_slot_readv
1038 data = [("%d" % i) * 100 for i in range(3)]
1039 rc = write("si1", secrets,
1040 {0: ([], [(0,data[0])], None),
1041 1: ([], [(0,data[1])], None),
1042 2: ([], [(0,data[2])], None),
1044 self.failUnlessEqual(rc, (True, {}))
1046 answer = read("si1", [], [(0, 10)])
1047 self.failUnlessEqual(answer, {0: ["0"*10],
1051 def compare_leases_without_timestamps(self, leases_a, leases_b):
1052 self.failUnlessEqual(len(leases_a), len(leases_b))
1053 for i in range(len(leases_a)):
1056 self.failUnlessEqual(a.owner_num, b.owner_num)
1057 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1058 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1059 self.failUnlessEqual(a.nodeid, b.nodeid)
1061 def compare_leases(self, leases_a, leases_b):
1062 self.failUnlessEqual(len(leases_a), len(leases_b))
1063 for i in range(len(leases_a)):
1066 self.failUnlessEqual(a.owner_num, b.owner_num)
1067 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1068 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1069 self.failUnlessEqual(a.nodeid, b.nodeid)
1070 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1072 def test_leases(self):
1073 ss = self.create("test_leases")
1075 return ( self.write_enabler("we1"),
1076 self.renew_secret("we1-%d" % n),
1077 self.cancel_secret("we1-%d" % n) )
1078 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1079 write = ss.remote_slot_testv_and_readv_and_writev
1080 read = ss.remote_slot_readv
1081 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1082 self.failUnlessEqual(rc, (True, {}))
1084 # create a random non-numeric file in the bucket directory, to
1085 # exercise the code that's supposed to ignore those.
1086 bucket_dir = os.path.join(self.workdir("test_leases"),
1087 "shares", storage_index_to_dir("si1"))
1088 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1089 f.write("you ought to be ignoring me\n")
1092 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1093 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1095 # add-lease on a missing storage index is silently ignored
1096 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1098 # re-allocate the slots and use the same secrets, that should update
1100 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1101 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1104 ss.remote_renew_lease("si1", secrets(0)[1])
1105 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1107 # now allocate them with a bunch of different secrets, to trigger the
1108 # extended lease code. Use add_lease for one of them.
1109 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1110 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1111 secrets2 = secrets(2)
1112 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1113 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1114 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1115 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1116 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1118 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1120 # cancel one of them
1121 ss.remote_cancel_lease("si1", secrets(5)[2])
1122 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1124 all_leases = list(s0.get_leases())
1125 # and write enough data to expand the container, forcing the server
1126 # to move the leases
1127 write("si1", secrets(0),
1128 {0: ([], [(0,data)], 200), },
1131 # read back the leases, make sure they're still intact.
1132 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1134 ss.remote_renew_lease("si1", secrets(0)[1])
1135 ss.remote_renew_lease("si1", secrets(1)[1])
1136 ss.remote_renew_lease("si1", secrets(2)[1])
1137 ss.remote_renew_lease("si1", secrets(3)[1])
1138 ss.remote_renew_lease("si1", secrets(4)[1])
1139 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1140 # get a new copy of the leases, with the current timestamps. Reading
1141 # data and failing to renew/cancel leases should leave the timestamps
1143 all_leases = list(s0.get_leases())
1144 # renewing with a bogus token should prompt an error message
1146 # examine the exception thus raised, make sure the old nodeid is
1147 # present, to provide for share migration
1148 e = self.failUnlessRaises(IndexError,
1149 ss.remote_renew_lease, "si1",
1152 self.failUnless("Unable to renew non-existent lease" in e_s)
1153 self.failUnless("I have leases accepted by nodeids:" in e_s)
1154 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1156 # same for cancelling
1157 self.failUnlessRaises(IndexError,
1158 ss.remote_cancel_lease, "si1",
1160 self.compare_leases(all_leases, list(s0.get_leases()))
1162 # reading shares should not modify the timestamp
1163 read("si1", [], [(0,200)])
1164 self.compare_leases(all_leases, list(s0.get_leases()))
1166 write("si1", secrets(0),
1167 {0: ([], [(200, "make me bigger")], None)}, [])
1168 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1170 write("si1", secrets(0),
1171 {0: ([], [(500, "make me really bigger")], None)}, [])
1172 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1174 # now cancel them all
1175 ss.remote_cancel_lease("si1", secrets(0)[2])
1176 ss.remote_cancel_lease("si1", secrets(1)[2])
1177 ss.remote_cancel_lease("si1", secrets(2)[2])
1178 ss.remote_cancel_lease("si1", secrets(3)[2])
1180 # the slot should still be there
1181 remaining_shares = read("si1", [], [(0,10)])
1182 self.failUnlessEqual(len(remaining_shares), 1)
1183 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1185 # cancelling a non-existent lease should raise an IndexError
1186 self.failUnlessRaises(IndexError,
1187 ss.remote_cancel_lease, "si1", "nonsecret")
1189 # and the slot should still be there
1190 remaining_shares = read("si1", [], [(0,10)])
1191 self.failUnlessEqual(len(remaining_shares), 1)
1192 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1194 ss.remote_cancel_lease("si1", secrets(4)[2])
1195 # now the slot should be gone
1196 no_shares = read("si1", [], [(0,10)])
1197 self.failUnlessEqual(no_shares, {})
1199 # cancelling a lease on a non-existent share should raise an IndexError
1200 self.failUnlessRaises(IndexError,
1201 ss.remote_cancel_lease, "si2", "nonsecret")
1203 def test_remove(self):
1204 ss = self.create("test_remove")
1205 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1207 readv = ss.remote_slot_readv
1208 writev = ss.remote_slot_testv_and_readv_and_writev
1209 secrets = ( self.write_enabler("we1"),
1210 self.renew_secret("we1"),
1211 self.cancel_secret("we1") )
1212 # delete sh0 by setting its size to zero
1213 answer = writev("si1", secrets,
1216 # the answer should mention all the shares that existed before the
1218 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1219 # but a new read should show only sh1 and sh2
1220 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1223 # delete sh1 by setting its size to zero
1224 answer = writev("si1", secrets,
1227 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1228 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1231 # delete sh2 by setting its size to zero
1232 answer = writev("si1", secrets,
1235 self.failUnlessEqual(answer, (True, {2:[]}) )
1236 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1238 # and the bucket directory should now be gone
1239 si = base32.b2a("si1")
1240 # note: this is a detail of the storage server implementation, and
1241 # may change in the future
1243 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1244 bucketdir = os.path.join(prefixdir, si)
1245 self.failUnless(os.path.exists(prefixdir))
1246 self.failIf(os.path.exists(bucketdir))
1248 class Stats(unittest.TestCase):
1251 self.sparent = LoggingServiceParent()
1252 self._lease_secret = itertools.count()
1254 return self.sparent.stopService()
1256 def workdir(self, name):
1257 basedir = os.path.join("storage", "Server", name)
1260 def create(self, name):
1261 workdir = self.workdir(name)
1262 ss = StorageServer(workdir, "\x00" * 20)
1263 ss.setServiceParent(self.sparent)
1266 def test_latencies(self):
1267 ss = self.create("test_latencies")
1268 for i in range(10000):
1269 ss.add_latency("allocate", 1.0 * i)
1270 for i in range(1000):
1271 ss.add_latency("renew", 1.0 * i)
1273 ss.add_latency("cancel", 2.0 * i)
1274 ss.add_latency("get", 5.0)
1276 output = ss.get_latencies()
1278 self.failUnlessEqual(sorted(output.keys()),
1279 sorted(["allocate", "renew", "cancel", "get"]))
1280 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1281 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1282 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1283 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1284 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1285 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1286 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1287 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1288 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1290 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1291 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1292 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1293 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1294 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1295 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1296 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1297 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1298 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1300 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1301 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1302 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1303 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1304 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1305 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1306 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1307 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1308 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1310 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1311 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1312 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1313 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1314 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1315 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1316 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1317 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1318 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1321 s = re.sub(r'<[^>]*>', ' ', s)
1322 s = re.sub(r'\s+', ' ', s)
1325 class MyBucketCountingCrawler(BucketCountingCrawler):
1326 def finished_prefix(self, cycle, prefix):
1327 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1329 d = self.hook_ds.pop(0)
1332 class MyStorageServer(StorageServer):
1333 def add_bucket_counter(self):
1334 statefile = os.path.join(self.storedir, "bucket_counter.state")
1335 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1336 self.bucket_counter.setServiceParent(self)
1338 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1341 self.s = service.MultiService()
1342 self.s.startService()
1344 return self.s.stopService()
1346 def test_bucket_counter(self):
1347 basedir = "storage/BucketCounter/bucket_counter"
1348 fileutil.make_dirs(basedir)
1349 ss = StorageServer(basedir, "\x00" * 20)
1350 # to make sure we capture the bucket-counting-crawler in the middle
1351 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1352 # also make it start sooner than usual.
1353 ss.bucket_counter.slow_start = 0
1354 orig_cpu_slice = ss.bucket_counter.cpu_slice
1355 ss.bucket_counter.cpu_slice = 0
1356 ss.setServiceParent(self.s)
1358 w = StorageStatus(ss)
1360 # this sample is before the crawler has started doing anything
1361 html = w.renderSynchronously()
1362 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1363 s = remove_tags(html)
1364 self.failUnless("Accepting new shares: Yes" in s, s)
1365 self.failUnless("Reserved space: - 0 B (0)" in s, s)
1366 self.failUnless("Total buckets: Not computed yet" in s, s)
1367 self.failUnless("Next crawl in" in s, s)
1369 # give the bucket-counting-crawler one tick to get started. The
1370 # cpu_slice=0 will force it to yield right after it processes the
1373 d = eventual.fireEventually()
1374 def _check(ignored):
1375 # are we really right after the first prefix?
1376 state = ss.bucket_counter.get_state()
1377 self.failUnlessEqual(state["last-complete-prefix"],
1378 ss.bucket_counter.prefixes[0])
1379 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1380 html = w.renderSynchronously()
1381 s = remove_tags(html)
1382 self.failUnless(" Current crawl " in s, s)
1383 self.failUnless(" (next work in " in s, s)
1384 d.addCallback(_check)
1386 # now give it enough time to complete a full cycle
1388 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1389 d.addCallback(lambda ignored: self.poll(_watch))
1390 def _check2(ignored):
1391 ss.bucket_counter.cpu_slice = orig_cpu_slice
1392 html = w.renderSynchronously()
1393 s = remove_tags(html)
1394 self.failUnless("Total buckets: 0 (the number of" in s, s)
1395 self.failUnless("Next crawl in 59 minutes" in s, s)
1396 d.addCallback(_check2)
1399 def test_bucket_counter_cleanup(self):
1400 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1401 fileutil.make_dirs(basedir)
1402 ss = StorageServer(basedir, "\x00" * 20)
1403 # to make sure we capture the bucket-counting-crawler in the middle
1404 # of a cycle, we reach in and reduce its maximum slice time to 0.
1405 ss.bucket_counter.slow_start = 0
1406 orig_cpu_slice = ss.bucket_counter.cpu_slice
1407 ss.bucket_counter.cpu_slice = 0
1408 ss.setServiceParent(self.s)
1410 d = eventual.fireEventually()
1412 def _after_first_prefix(ignored):
1413 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1414 # now sneak in and mess with its state, to make sure it cleans up
1415 # properly at the end of the cycle
1416 state = ss.bucket_counter.state
1417 self.failUnlessEqual(state["last-complete-prefix"],
1418 ss.bucket_counter.prefixes[0])
1419 state["bucket-counts"][-12] = {}
1420 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1421 ss.bucket_counter.save_state()
1422 d.addCallback(_after_first_prefix)
1424 # now give it enough time to complete a cycle
1426 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1427 d.addCallback(lambda ignored: self.poll(_watch))
1428 def _check2(ignored):
1429 ss.bucket_counter.cpu_slice = orig_cpu_slice
1430 s = ss.bucket_counter.get_state()
1431 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1432 self.failIf("bogusprefix!" in s["storage-index-samples"],
1433 s["storage-index-samples"].keys())
1434 d.addCallback(_check2)
1437 def test_bucket_counter_eta(self):
1438 basedir = "storage/BucketCounter/bucket_counter_eta"
1439 fileutil.make_dirs(basedir)
1440 ss = MyStorageServer(basedir, "\x00" * 20)
1441 ss.bucket_counter.slow_start = 0
1442 # these will be fired inside finished_prefix()
1443 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1444 w = StorageStatus(ss)
1446 d = defer.Deferred()
1448 def _check_1(ignored):
1449 # no ETA is available yet
1450 html = w.renderSynchronously()
1451 s = remove_tags(html)
1452 self.failUnlessIn("complete (next work", s)
1454 def _check_2(ignored):
1455 # one prefix has finished, so an ETA based upon that elapsed time
1456 # should be available.
1457 html = w.renderSynchronously()
1458 s = remove_tags(html)
1459 self.failUnlessIn("complete (ETA ", s)
1461 def _check_3(ignored):
1462 # two prefixes have finished
1463 html = w.renderSynchronously()
1464 s = remove_tags(html)
1465 self.failUnlessIn("complete (ETA ", s)
1468 hooks[0].addCallback(_check_1).addErrback(d.errback)
1469 hooks[1].addCallback(_check_2).addErrback(d.errback)
1470 hooks[2].addCallback(_check_3).addErrback(d.errback)
1472 ss.setServiceParent(self.s)
1475 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1476 stop_after_first_bucket = False
1477 def process_bucket(self, *args, **kwargs):
1478 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1479 if self.stop_after_first_bucket:
1480 self.stop_after_first_bucket = False
1481 self.cpu_slice = -1.0
1482 def yielding(self, sleep_time):
1483 if not self.stop_after_first_bucket:
1484 self.cpu_slice = 500
1486 class BrokenStatResults:
1488 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1491 bsr = BrokenStatResults()
1492 for attrname in dir(s):
1493 if attrname.startswith("_"):
1495 if attrname == "st_blocks":
1497 setattr(bsr, attrname, getattr(s, attrname))
1500 class InstrumentedStorageServer(StorageServer):
1501 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1502 class No_ST_BLOCKS_StorageServer(StorageServer):
1503 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1505 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1508 self.s = service.MultiService()
1509 self.s.startService()
1511 return self.s.stopService()
1513 def make_shares(self, ss):
1515 return (si, hashutil.tagged_hash("renew", si),
1516 hashutil.tagged_hash("cancel", si))
1517 def make_mutable(si):
1518 return (si, hashutil.tagged_hash("renew", si),
1519 hashutil.tagged_hash("cancel", si),
1520 hashutil.tagged_hash("write-enabler", si))
1521 def make_extra_lease(si, num):
1522 return (hashutil.tagged_hash("renew-%d" % num, si),
1523 hashutil.tagged_hash("cancel-%d" % num, si))
1525 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1526 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1527 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1528 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1529 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1530 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1532 canary = FakeCanary()
1533 # note: 'tahoe debug dump-share' will not handle this file, since the
1534 # inner contents are not a valid CHK share
1535 data = "\xff" * 1000
1537 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1539 w[0].remote_write(0, data)
1542 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1544 w[0].remote_write(0, data)
1546 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1548 writev = ss.remote_slot_testv_and_readv_and_writev
1549 writev(mutable_si_2, (we2, rs2, cs2),
1550 {0: ([], [(0,data)], len(data))}, [])
1551 writev(mutable_si_3, (we3, rs3, cs3),
1552 {0: ([], [(0,data)], len(data))}, [])
1553 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1555 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1556 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1557 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1559 def test_basic(self):
1560 basedir = "storage/LeaseCrawler/basic"
1561 fileutil.make_dirs(basedir)
1562 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1563 # make it start sooner than usual.
1564 lc = ss.lease_checker
1567 lc.stop_after_first_bucket = True
1568 webstatus = StorageStatus(ss)
1570 # create a few shares, with some leases on them
1571 self.make_shares(ss)
1572 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1574 # add a non-sharefile to exercise another code path
1575 fn = os.path.join(ss.sharedir,
1576 storage_index_to_dir(immutable_si_0),
1579 f.write("I am not a share.\n")
1582 # this is before the crawl has started, so we're not in a cycle yet
1583 initial_state = lc.get_state()
1584 self.failIf(lc.get_progress()["cycle-in-progress"])
1585 self.failIf("cycle-to-date" in initial_state)
1586 self.failIf("estimated-remaining-cycle" in initial_state)
1587 self.failIf("estimated-current-cycle" in initial_state)
1588 self.failUnless("history" in initial_state)
1589 self.failUnlessEqual(initial_state["history"], {})
1591 ss.setServiceParent(self.s)
1593 d = eventual.fireEventually()
1595 # now examine the state right after the first bucket has been
1597 def _after_first_bucket(ignored):
1598 initial_state = lc.get_state()
1599 self.failUnless("cycle-to-date" in initial_state)
1600 self.failUnless("estimated-remaining-cycle" in initial_state)
1601 self.failUnless("estimated-current-cycle" in initial_state)
1602 self.failUnless("history" in initial_state)
1603 self.failUnlessEqual(initial_state["history"], {})
1605 so_far = initial_state["cycle-to-date"]
1606 self.failUnlessEqual(so_far["expiration-enabled"], False)
1607 self.failUnless("configured-expiration-time" in so_far)
1608 self.failUnless("lease-age-histogram" in so_far)
1609 lah = so_far["lease-age-histogram"]
1610 self.failUnlessEqual(type(lah), list)
1611 self.failUnlessEqual(len(lah), 1)
1612 self.failUnlessEqual(lah, [ (0.0, lc.age_limit/10.0, 1) ] )
1613 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1614 self.failUnlessEqual(so_far["buckets-examined"], 1)
1615 self.failUnlessEqual(so_far["shares-examined"], 1)
1616 sr1 = so_far["space-recovered"]
1617 self.failUnlessEqual(sr1["actual-numshares"], 0)
1618 self.failUnlessEqual(sr1["configured-leasetimer-diskbytes"], 0)
1619 self.failUnlessEqual(sr1["original-leasetimer-sharebytes"], 0)
1620 left = initial_state["estimated-remaining-cycle"]
1621 self.failUnless(left["buckets-examined"] > 0,
1622 left["buckets-examined"])
1623 self.failUnless(left["shares-examined"] > 0,
1624 left["shares-examined"])
1625 sr2 = left["space-recovered"]
1626 self.failIfEqual(sr2["actual-numshares"], None)
1627 self.failIfEqual(sr2["configured-leasetimer-diskbytes"], None)
1628 self.failIfEqual(sr2["original-leasetimer-sharebytes"], None)
1629 d.addCallback(_after_first_bucket)
1630 d.addCallback(lambda ign: self.render1(webstatus))
1631 def _check_html_in_cycle(html):
1632 s = remove_tags(html)
1633 self.failUnlessIn("So far, this cycle has examined "
1634 "1 shares in 1 buckets "
1635 "and has recovered: "
1636 "0 shares, 0 buckets, 0 B ", s)
1637 self.failUnlessIn("If expiration were enabled, "
1638 "we would have recovered: "
1639 "0 shares, 0 buckets, 0 B by now", s)
1640 self.failUnlessIn("and the remainder of this cycle "
1641 "would probably recover: "
1642 "0 shares, 0 buckets, 0 B ", s)
1643 self.failUnlessIn("and the whole cycle would probably recover: "
1644 "0 shares, 0 buckets, 0 B ", s)
1645 self.failUnlessIn("if we were using each lease's default "
1646 "31-day lease lifetime", s)
1647 self.failUnlessIn("this cycle would be expected to recover: ", s)
1648 d.addCallback(_check_html_in_cycle)
1650 # wait for the crawler to finish the first cycle. Nothing should have
1653 return bool(lc.get_state()["last-cycle-finished"] is not None)
1654 d.addCallback(lambda ign: self.poll(_wait))
1656 def _after_first_cycle(ignored):
1658 self.failIf("cycle-to-date" in s)
1659 self.failIf("estimated-remaining-cycle" in s)
1660 self.failIf("estimated-current-cycle" in s)
1661 last = s["history"][0]
1662 self.failUnless("cycle-start-finish-times" in last)
1663 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1664 self.failUnlessEqual(last["expiration-enabled"], False)
1665 self.failUnless("configured-expiration-time" in last)
1667 self.failUnless("lease-age-histogram" in last)
1668 lah = last["lease-age-histogram"]
1669 self.failUnlessEqual(type(lah), list)
1670 self.failUnlessEqual(len(lah), 1)
1671 self.failUnlessEqual(lah, [ (0.0, lc.age_limit/10.0, 6) ] )
1673 self.failUnlessEqual(last["leases-per-share-histogram"],
1675 self.failUnlessEqual(last["buckets-examined"], 4)
1676 self.failUnlessEqual(last["shares-examined"], 4)
1678 rec = last["space-recovered"]
1679 self.failUnlessEqual(rec["actual-numbuckets"], 0)
1680 self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 0)
1681 self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 0)
1682 self.failUnlessEqual(rec["actual-numshares"], 0)
1683 self.failUnlessEqual(rec["original-leasetimer-numshares"], 0)
1684 self.failUnlessEqual(rec["configured-leasetimer-numshares"], 0)
1685 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1686 self.failUnlessEqual(rec["original-leasetimer-diskbytes"], 0)
1687 self.failUnlessEqual(rec["configured-leasetimer-diskbytes"], 0)
1688 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1689 self.failUnlessEqual(rec["original-leasetimer-sharebytes"], 0)
1690 self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], 0)
1692 def _get_sharefile(si):
1693 return list(ss._iter_share_files(si))[0]
1694 def count_leases(si):
1695 return len(list(_get_sharefile(si).get_leases()))
1696 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1697 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1698 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1699 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1700 d.addCallback(_after_first_cycle)
1701 d.addCallback(lambda ign: self.render1(webstatus))
1702 def _check_html(html):
1703 s = remove_tags(html)
1704 self.failUnlessIn("recovered: 0 shares, 0 buckets, 0 B "
1705 "but expiration was not enabled", s)
1706 d.addCallback(_check_html)
1709 def backdate_lease(self, sf, renew_secret, new_expire_time):
1710 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1711 # "renew" a lease with a new_expire_time that is older than what the
1712 # current lease has), so we have to reach inside it.
1713 for i,lease in enumerate(sf.get_leases()):
1714 if lease.renew_secret == renew_secret:
1715 lease.expiration_time = new_expire_time
1716 f = open(sf.home, 'rb+')
1717 sf._write_lease_record(f, i, lease)
1720 raise IndexError("unable to renew non-existent lease")
1722 def test_expire(self):
1723 basedir = "storage/LeaseCrawler/expire"
1724 fileutil.make_dirs(basedir)
1725 # setting expiration_time to 2000 means that any lease which is more
1726 # than 2000s old will be expired.
1727 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1729 expiration_time=2000)
1730 # make it start sooner than usual.
1731 lc = ss.lease_checker
1733 lc.stop_after_first_bucket = True
1734 webstatus = StorageStatus(ss)
1736 # create a few shares, with some leases on them
1737 self.make_shares(ss)
1738 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1740 def count_shares(si):
1741 return len(list(ss._iter_share_files(si)))
1742 def _get_sharefile(si):
1743 return list(ss._iter_share_files(si))[0]
1744 def count_leases(si):
1745 return len(list(_get_sharefile(si).get_leases()))
1747 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1748 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1749 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1750 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1751 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1752 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1753 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1754 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1756 # artificially crank back the expiration time on the first lease of
1757 # each share, to make it look like it expired already (age=1000s).
1758 # Some shares have an extra lease which is set to expire at the
1759 # default time in 31 days from now (age=31days). We then run the
1760 # crawler, which will expire the first lease, making some shares get
1761 # deleted and others stay alive (with one remaining lease)
1764 sf0 = _get_sharefile(immutable_si_0)
1765 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1766 sf0_size = os.stat(sf0.home).st_size
1768 # immutable_si_1 gets an extra lease
1769 sf1 = _get_sharefile(immutable_si_1)
1770 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1772 sf2 = _get_sharefile(mutable_si_2)
1773 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1774 sf2_size = os.stat(sf2.home).st_size
1776 # mutable_si_3 gets an extra lease
1777 sf3 = _get_sharefile(mutable_si_3)
1778 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1780 ss.setServiceParent(self.s)
1782 d = eventual.fireEventually()
1783 # examine the state right after the first bucket has been processed
1784 def _after_first_bucket(ignored):
1785 p = lc.get_progress()
1786 self.failUnless(p["cycle-in-progress"])
1787 d.addCallback(_after_first_bucket)
1788 d.addCallback(lambda ign: self.render1(webstatus))
1789 def _check_html_in_cycle(html):
1790 s = remove_tags(html)
1791 # the first bucket encountered gets deleted, and its prefix
1792 # happens to be about 1/6th of the way through the ring, so the
1793 # predictor thinks we'll have 6 shares and that we'll delete them
1794 # all. This part of the test depends upon the SIs landing right
1795 # where they do now.
1796 self.failUnlessIn("The remainder of this cycle is expected to "
1797 "recover: 5 shares, 5 buckets", s)
1798 self.failUnlessIn("The whole cycle is expected to examine "
1799 "6 shares in 6 buckets and to recover: "
1800 "6 shares, 6 buckets", s)
1801 d.addCallback(_check_html_in_cycle)
1803 # wait for the crawler to finish the first cycle. Two shares should
1806 return bool(lc.get_state()["last-cycle-finished"] is not None)
1807 d.addCallback(lambda ign: self.poll(_wait))
1809 def _after_first_cycle(ignored):
1810 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1811 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1812 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1813 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1814 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1815 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1818 last = s["history"][0]
1820 self.failUnlessEqual(last["expiration-enabled"], True)
1821 self.failUnlessEqual(last["configured-expiration-time"], 2000)
1822 self.failUnlessEqual(last["buckets-examined"], 4)
1823 self.failUnlessEqual(last["shares-examined"], 4)
1824 self.failUnlessEqual(last["leases-per-share-histogram"],
1827 rec = last["space-recovered"]
1828 self.failUnlessEqual(rec["actual-numbuckets"], 2)
1829 self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 2)
1830 self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 2)
1831 self.failUnlessEqual(rec["actual-numshares"], 2)
1832 self.failUnlessEqual(rec["original-leasetimer-numshares"], 2)
1833 self.failUnlessEqual(rec["configured-leasetimer-numshares"], 2)
1834 size = sf0_size + sf2_size
1835 self.failUnlessEqual(rec["actual-sharebytes"], size)
1836 self.failUnlessEqual(rec["original-leasetimer-sharebytes"], size)
1837 self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], size)
1838 # different platforms have different notions of "blocks used by
1839 # this file", so merely assert that it's a number
1840 self.failUnless(rec["actual-diskbytes"] >= 0,
1841 rec["actual-diskbytes"])
1842 self.failUnless(rec["original-leasetimer-diskbytes"] >= 0,
1843 rec["original-leasetimer-diskbytes"])
1844 self.failUnless(rec["configured-leasetimer-diskbytes"] >= 0,
1845 rec["configured-leasetimer-diskbytes"])
1846 d.addCallback(_after_first_cycle)
1847 d.addCallback(lambda ign: self.render1(webstatus))
1848 def _check_html(html):
1849 s = remove_tags(html)
1850 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1851 self.failUnlessIn(" recovered: 2 shares, 2 buckets, ", s)
1852 d.addCallback(_check_html)
1855 def test_limited_history(self):
1856 basedir = "storage/LeaseCrawler/limited_history"
1857 fileutil.make_dirs(basedir)
1858 ss = StorageServer(basedir, "\x00" * 20)
1859 # make it start sooner than usual.
1860 lc = ss.lease_checker
1864 # create a few shares, with some leases on them
1865 self.make_shares(ss)
1867 ss.setServiceParent(self.s)
1869 def _wait_until_15_cycles_done():
1870 last = lc.state["last-cycle-finished"]
1871 if last is not None and last >= 15:
1876 d = self.poll(_wait_until_15_cycles_done)
1878 def _check(ignored):
1881 self.failUnlessEqual(len(h), 10)
1882 self.failUnlessEqual(max(h.keys()), 15)
1883 self.failUnlessEqual(min(h.keys()), 6)
1884 d.addCallback(_check)
1887 def test_unpredictable_future(self):
1888 basedir = "storage/LeaseCrawler/unpredictable_future"
1889 fileutil.make_dirs(basedir)
1890 ss = StorageServer(basedir, "\x00" * 20)
1891 # make it start sooner than usual.
1892 lc = ss.lease_checker
1894 lc.cpu_slice = -1.0 # stop quickly
1896 self.make_shares(ss)
1898 ss.setServiceParent(self.s)
1900 d = eventual.fireEventually()
1901 def _check(ignored):
1902 # this should fire after the first bucket is complete, but before
1903 # the first prefix is complete, so the progress-measurer won't
1904 # think we've gotten far enough to raise our percent-complete
1905 # above 0%, triggering the cannot-predict-the-future code in
1906 # expirer.py . This will have to change if/when the
1907 # progress-measurer gets smart enough to count buckets (we'll
1908 # have to interrupt it even earlier, before it's finished the
1911 self.failUnless("cycle-to-date" in s)
1912 self.failUnless("estimated-remaining-cycle" in s)
1913 self.failUnless("estimated-current-cycle" in s)
1915 left = s["estimated-remaining-cycle"]["space-recovered"]
1916 self.failUnlessEqual(left["actual-numbuckets"], None)
1917 self.failUnlessEqual(left["original-leasetimer-numbuckets"], None)
1918 self.failUnlessEqual(left["configured-leasetimer-numbuckets"], None)
1919 self.failUnlessEqual(left["actual-numshares"], None)
1920 self.failUnlessEqual(left["original-leasetimer-numshares"], None)
1921 self.failUnlessEqual(left["configured-leasetimer-numshares"], None)
1922 self.failUnlessEqual(left["actual-diskbytes"], None)
1923 self.failUnlessEqual(left["original-leasetimer-diskbytes"], None)
1924 self.failUnlessEqual(left["configured-leasetimer-diskbytes"], None)
1925 self.failUnlessEqual(left["actual-sharebytes"], None)
1926 self.failUnlessEqual(left["original-leasetimer-sharebytes"], None)
1927 self.failUnlessEqual(left["configured-leasetimer-sharebytes"], None)
1929 full = s["estimated-remaining-cycle"]["space-recovered"]
1930 self.failUnlessEqual(full["actual-numbuckets"], None)
1931 self.failUnlessEqual(full["original-leasetimer-numbuckets"], None)
1932 self.failUnlessEqual(full["configured-leasetimer-numbuckets"], None)
1933 self.failUnlessEqual(full["actual-numshares"], None)
1934 self.failUnlessEqual(full["original-leasetimer-numshares"], None)
1935 self.failUnlessEqual(full["configured-leasetimer-numshares"], None)
1936 self.failUnlessEqual(full["actual-diskbytes"], None)
1937 self.failUnlessEqual(full["original-leasetimer-diskbytes"], None)
1938 self.failUnlessEqual(full["configured-leasetimer-diskbytes"], None)
1939 self.failUnlessEqual(full["actual-sharebytes"], None)
1940 self.failUnlessEqual(full["original-leasetimer-sharebytes"], None)
1941 self.failUnlessEqual(full["configured-leasetimer-sharebytes"], None)
1943 d.addCallback(_check)
1946 def test_no_st_blocks(self):
1947 basedir = "storage/LeaseCrawler/no_st_blocks"
1948 fileutil.make_dirs(basedir)
1949 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
1950 expiration_time=-1000)
1951 # a negative expiration_time= means the "configured-leasetimer-"
1952 # space-recovered counts will be non-zero, since all shares will have
1955 # make it start sooner than usual.
1956 lc = ss.lease_checker
1959 self.make_shares(ss)
1960 ss.setServiceParent(self.s)
1962 return bool(lc.get_state()["last-cycle-finished"] is not None)
1963 d = self.poll(_wait)
1965 def _check(ignored):
1967 last = s["history"][0]
1968 rec = last["space-recovered"]
1969 self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 4)
1970 self.failUnlessEqual(rec["configured-leasetimer-numshares"], 4)
1971 self.failUnless(rec["configured-leasetimer-sharebytes"] > 0,
1972 rec["configured-leasetimer-sharebytes"])
1973 # without the .st_blocks field in os.stat() results, we should be
1974 # reporting diskbytes==sharebytes
1975 self.failUnlessEqual(rec["configured-leasetimer-sharebytes"],
1976 rec["configured-leasetimer-diskbytes"])
1977 d.addCallback(_check)
1980 class NoStatvfsServer(StorageServer):
1981 def do_statvfs(self):
1982 raise AttributeError
1984 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1987 self.s = service.MultiService()
1988 self.s.startService()
1990 return self.s.stopService()
1992 def test_no_server(self):
1993 w = StorageStatus(None)
1994 html = w.renderSynchronously()
1995 self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
1997 def test_status(self):
1998 basedir = "storage/WebStatus/status"
1999 fileutil.make_dirs(basedir)
2000 ss = StorageServer(basedir, "\x00" * 20)
2001 ss.setServiceParent(self.s)
2002 w = StorageStatus(ss)
2004 def _check_html(html):
2005 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2006 s = remove_tags(html)
2007 self.failUnless("Accepting new shares: Yes" in s, s)
2008 self.failUnless("Reserved space: - 0 B (0)" in s, s)
2009 d.addCallback(_check_html)
2010 d.addCallback(lambda ign: self.render_json(w))
2011 def _check_json(json):
2012 data = simplejson.loads(json)
2014 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2015 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2016 self.failUnless("bucket-counter" in data)
2017 self.failUnless("lease-checker" in data)
2018 d.addCallback(_check_json)
2021 def render_json(self, page):
2022 d = self.render1(page, args={"t": ["json"]})
2025 def test_status_no_statvfs(self):
2026 # windows has no os.statvfs . Make sure the code handles that even on
2028 basedir = "storage/WebStatus/status_no_statvfs"
2029 fileutil.make_dirs(basedir)
2030 ss = NoStatvfsServer(basedir, "\x00" * 20)
2031 ss.setServiceParent(self.s)
2032 w = StorageStatus(ss)
2033 html = w.renderSynchronously()
2034 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2035 s = remove_tags(html)
2036 self.failUnless("Accepting new shares: Yes" in s, s)
2037 self.failUnless("Total disk space: ?" in s, s)
2039 def test_readonly(self):
2040 basedir = "storage/WebStatus/readonly"
2041 fileutil.make_dirs(basedir)
2042 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2043 ss.setServiceParent(self.s)
2044 w = StorageStatus(ss)
2045 html = w.renderSynchronously()
2046 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2047 s = remove_tags(html)
2048 self.failUnless("Accepting new shares: No" in s, s)
2050 def test_reserved(self):
2051 basedir = "storage/WebStatus/reserved"
2052 fileutil.make_dirs(basedir)
2053 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2054 ss.setServiceParent(self.s)
2055 w = StorageStatus(ss)
2056 html = w.renderSynchronously()
2057 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2058 s = remove_tags(html)
2059 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2061 def test_huge_reserved(self):
2062 basedir = "storage/WebStatus/reserved"
2063 fileutil.make_dirs(basedir)
2064 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2065 ss.setServiceParent(self.s)
2066 w = StorageStatus(ss)
2067 html = w.renderSynchronously()
2068 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2069 s = remove_tags(html)
2070 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2072 def test_util(self):
2073 w = StorageStatus(None)
2074 self.failUnlessEqual(w.render_space(None, None), "?")
2075 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2076 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2077 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2078 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2079 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)