2 import time, os.path, stat, re, simplejson, struct
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
30 def __init__(self, ignore_disconnectors=False):
31 self.ignore = ignore_disconnectors
32 self.disconnectors = {}
33 def notifyOnDisconnect(self, f, *args, **kwargs):
37 self.disconnectors[m] = (f, args, kwargs)
39 def dontNotifyOnDisconnect(self, marker):
42 del self.disconnectors[marker]
44 class FakeStatsProvider:
45 def count(self, name, delta=1):
47 def register_producer(self, producer):
50 class Bucket(unittest.TestCase):
51 def make_workdir(self, name):
52 basedir = os.path.join("storage", "Bucket", name)
53 incoming = os.path.join(basedir, "tmp", "bucket")
54 final = os.path.join(basedir, "bucket")
55 fileutil.make_dirs(basedir)
56 fileutil.make_dirs(os.path.join(basedir, "tmp"))
57 return incoming, final
59 def bucket_writer_closed(self, bw, consumed):
61 def add_latency(self, category, latency):
63 def count(self, name, delta=1):
68 renew_secret = os.urandom(32)
69 cancel_secret = os.urandom(32)
70 expiration_time = time.time() + 5000
71 return LeaseInfo(owner_num, renew_secret, cancel_secret,
72 expiration_time, "\x00" * 20)
74 def test_create(self):
75 incoming, final = self.make_workdir("test_create")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*25)
81 bw.remote_write(75, "d"*7)
84 def test_readwrite(self):
85 incoming, final = self.make_workdir("test_readwrite")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*7) # last block may be short
94 br = BucketReader(self, bw.finalhome)
95 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101 def callRemote(self, methname, *args, **kwargs):
103 meth = getattr(self.target, "remote_" + methname)
104 return meth(*args, **kwargs)
105 return defer.maybeDeferred(_call)
107 class BucketProxy(unittest.TestCase):
108 def make_bucket(self, name, size):
109 basedir = os.path.join("storage", "BucketProxy", name)
110 incoming = os.path.join(basedir, "tmp", "bucket")
111 final = os.path.join(basedir, "bucket")
112 fileutil.make_dirs(basedir)
113 fileutil.make_dirs(os.path.join(basedir, "tmp"))
114 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
120 def make_lease(self):
122 renew_secret = os.urandom(32)
123 cancel_secret = os.urandom(32)
124 expiration_time = time.time() + 5000
125 return LeaseInfo(owner_num, renew_secret, cancel_secret,
126 expiration_time, "\x00" * 20)
128 def bucket_writer_closed(self, bw, consumed):
130 def add_latency(self, category, latency):
132 def count(self, name, delta=1):
135 def test_create(self):
136 bw, rb, sharefname = self.make_bucket("test_create", 500)
137 bp = WriteBucketProxy(rb,
142 uri_extension_size_max=500, nodeid=None)
143 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
145 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146 # Let's pretend each share has 100 bytes of data, and that there are
147 # 4 segments (25 bytes each), and 8 shares total. So the two
148 # per-segment merkle trees (crypttext_hash_tree,
149 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152 # long. That should make the whole share:
154 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165 uri_extension = "s" + "E"*498 + "e"
167 bw, rb, sharefname = self.make_bucket(name, sharesize)
173 uri_extension_size_max=len(uri_extension),
177 d.addCallback(lambda res: bp.put_block(0, "a"*25))
178 d.addCallback(lambda res: bp.put_block(1, "b"*25))
179 d.addCallback(lambda res: bp.put_block(2, "c"*25))
180 d.addCallback(lambda res: bp.put_block(3, "d"*20))
181 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185 d.addCallback(lambda res: bp.close())
187 # now read everything back
188 def _start_reading(res):
189 br = BucketReader(self, sharefname)
192 rbp = rbp_class(rb, peerid="abc", storage_index="")
193 self.failUnless("to peer" in repr(rbp))
194 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
196 d1 = rbp.get_block_data(0, 25, 25)
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206 d1.addCallback(lambda res:
207 self.failUnlessEqual(res, crypttext_hashes))
208 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210 d1.addCallback(lambda res: rbp.get_share_hashes())
211 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212 d1.addCallback(lambda res: rbp.get_uri_extension())
213 d1.addCallback(lambda res:
214 self.failUnlessEqual(res, uri_extension))
218 d.addCallback(_start_reading)
222 def test_readwrite_v1(self):
223 return self._do_test_readwrite("test_readwrite_v1",
224 0x24, WriteBucketProxy, ReadBucketProxy)
226 def test_readwrite_v2(self):
227 return self._do_test_readwrite("test_readwrite_v2",
228 0x44, WriteBucketProxy_v2, ReadBucketProxy)
230 class FakeDiskStorageServer(StorageServer):
231 def stat_disk(self, d):
232 return self.DISKAVAIL
234 class Server(unittest.TestCase):
237 self.sparent = LoggingServiceParent()
238 self.sparent.startService()
239 self._lease_secret = itertools.count()
241 return self.sparent.stopService()
243 def workdir(self, name):
244 basedir = os.path.join("storage", "Server", name)
247 def create(self, name, reserved_space=0, klass=StorageServer):
248 workdir = self.workdir(name)
249 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
250 stats_provider=FakeStatsProvider())
251 ss.setServiceParent(self.sparent)
254 def test_create(self):
255 ss = self.create("test_create")
257 def allocate(self, ss, storage_index, sharenums, size, canary=None):
258 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
261 canary = FakeCanary()
262 return ss.remote_allocate_buckets(storage_index,
263 renew_secret, cancel_secret,
264 sharenums, size, canary)
266 def test_large_share(self):
267 ss = self.create("test_large_share")
269 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
270 self.failUnlessEqual(already, set())
271 self.failUnlessEqual(set(writers.keys()), set([0]))
273 shnum, bucket = writers.items()[0]
274 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
275 bucket.remote_write(2**32, "ab")
276 bucket.remote_close()
278 readers = ss.remote_get_buckets("allocate")
279 reader = readers[shnum]
280 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
281 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
283 def test_dont_overfill_dirs(self):
285 This test asserts that if you add a second share whose storage index
286 share lots of leading bits with an extant share (but isn't the exact
287 same storage index), this won't add an entry to the share directory.
289 ss = self.create("test_dont_overfill_dirs")
290 already, writers = self.allocate(ss, "storageindex", [0], 10)
291 for i, wb in writers.items():
292 wb.remote_write(0, "%10d" % i)
294 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296 children_of_storedir = set(os.listdir(storedir))
298 # Now store another one under another storageindex that has leading
299 # chars the same as the first storageindex.
300 already, writers = self.allocate(ss, "storageindey", [0], 10)
301 for i, wb in writers.items():
302 wb.remote_write(0, "%10d" % i)
304 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
306 new_children_of_storedir = set(os.listdir(storedir))
307 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
309 def test_remove_incoming(self):
310 ss = self.create("test_remove_incoming")
311 already, writers = self.allocate(ss, "vid", range(3), 10)
312 for i,wb in writers.items():
313 wb.remote_write(0, "%10d" % i)
315 incoming_share_dir = wb.incominghome
316 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
317 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
318 incoming_dir = os.path.dirname(incoming_prefix_dir)
319 self.failIf(os.path.exists(incoming_bucket_dir))
320 self.failIf(os.path.exists(incoming_prefix_dir))
321 self.failUnless(os.path.exists(incoming_dir))
323 def test_allocate(self):
324 ss = self.create("test_allocate")
326 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
328 canary = FakeCanary()
329 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330 self.failUnlessEqual(already, set())
331 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
333 # while the buckets are open, they should not count as readable
334 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
337 for i,wb in writers.items():
338 wb.remote_write(0, "%25d" % i)
340 # aborting a bucket that was already closed is a no-op
343 # now they should be readable
344 b = ss.remote_get_buckets("allocate")
345 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
348 self.failUnless("BucketReader" in b_str, b_str)
349 self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
351 # now if we ask about writing again, the server should offer those
352 # three buckets as already present. It should offer them even if we
353 # don't ask about those specific ones.
354 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355 self.failUnlessEqual(already, set([0,1,2]))
356 self.failUnlessEqual(set(writers.keys()), set([3,4]))
358 # while those two buckets are open for writing, the server should
359 # refuse to offer them to uploaders
361 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362 self.failUnlessEqual(already2, set([0,1,2]))
363 self.failUnlessEqual(set(writers2.keys()), set([5]))
365 # aborting the writes should remove the tempfiles
366 for i,wb in writers2.items():
368 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369 self.failUnlessEqual(already2, set([0,1,2]))
370 self.failUnlessEqual(set(writers2.keys()), set([5]))
372 for i,wb in writers2.items():
374 for i,wb in writers.items():
377 def test_bad_container_version(self):
378 ss = self.create("test_bad_container_version")
379 a,w = self.allocate(ss, "si1", [0], 10)
380 w[0].remote_write(0, "\xff"*10)
383 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
386 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
389 b = ss.remote_get_buckets("allocate")
391 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392 ss.remote_get_buckets, "si1")
393 self.failUnless(" had version 0 but we wanted 1" in str(e), e)
395 def test_disconnect(self):
396 # simulate a disconnection
397 ss = self.create("test_disconnect")
398 canary = FakeCanary()
399 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400 self.failUnlessEqual(already, set())
401 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402 for (f,args,kwargs) in canary.disconnectors.values():
407 # that ought to delete the incoming shares
408 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409 self.failUnlessEqual(already, set())
410 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
412 def test_reserved_space(self):
413 ss = self.create("test_reserved_space", reserved_space=10000,
414 klass=FakeDiskStorageServer)
415 # the FakeDiskStorageServer doesn't do real statvfs() calls
417 # 15k available, 10k reserved, leaves 5k for shares
419 # a newly created and filled share incurs this much overhead, beyond
420 # the size we request.
422 LEASE_SIZE = 4+32+32+4
423 canary = FakeCanary(True)
424 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425 self.failUnlessEqual(len(writers), 3)
426 # now the StorageServer should have 3000 bytes provisionally
427 # allocated, allowing only 2000 more to be claimed
428 self.failUnlessEqual(len(ss._active_writers), 3)
430 # allocating 1001-byte shares only leaves room for one
431 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432 self.failUnlessEqual(len(writers2), 1)
433 self.failUnlessEqual(len(ss._active_writers), 4)
435 # we abandon the first set, so their provisional allocation should be
439 self.failUnlessEqual(len(ss._active_writers), 1)
440 # now we have a provisional allocation of 1001 bytes
442 # and we close the second set, so their provisional allocation should
443 # become real, long-term allocation, and grows to include the
445 for bw in writers2.values():
446 bw.remote_write(0, "a"*25)
451 self.failUnlessEqual(len(ss._active_writers), 0)
453 allocated = 1001 + OVERHEAD + LEASE_SIZE
455 # we have to manually increase DISKAVAIL, since we're not doing real
457 ss.DISKAVAIL -= allocated
459 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462 self.failUnlessEqual(len(writers3), 39)
463 self.failUnlessEqual(len(ss._active_writers), 39)
467 self.failUnlessEqual(len(ss._active_writers), 0)
468 ss.disownServiceParent()
472 basedir = self.workdir("test_seek_behavior")
473 fileutil.make_dirs(basedir)
474 filename = os.path.join(basedir, "testfile")
475 f = open(filename, "wb")
478 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
479 # files. mode="a" preserves previous contents but does not allow
480 # seeking-to-create-holes. mode="r+" allows both.
481 f = open(filename, "rb+")
485 filelen = os.stat(filename)[stat.ST_SIZE]
486 self.failUnlessEqual(filelen, 100+3)
487 f2 = open(filename, "rb")
488 self.failUnlessEqual(f2.read(5), "start")
491 def test_leases(self):
492 ss = self.create("test_leases")
493 canary = FakeCanary()
497 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
498 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
499 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
500 sharenums, size, canary)
501 self.failUnlessEqual(len(already), 0)
502 self.failUnlessEqual(len(writers), 5)
503 for wb in writers.values():
506 leases = list(ss.get_leases("si0"))
507 self.failUnlessEqual(len(leases), 1)
508 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
510 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
511 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
512 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
513 sharenums, size, canary)
514 for wb in writers.values():
517 # take out a second lease on si1
518 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
521 sharenums, size, canary)
522 self.failUnlessEqual(len(already), 5)
523 self.failUnlessEqual(len(writers), 0)
525 leases = list(ss.get_leases("si1"))
526 self.failUnlessEqual(len(leases), 2)
527 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
529 # and a third lease, using add-lease
530 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
531 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
532 ss.remote_add_lease("si1", rs2a, cs2a)
533 leases = list(ss.get_leases("si1"))
534 self.failUnlessEqual(len(leases), 3)
535 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
537 # add-lease on a missing storage index is silently ignored
538 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
540 # check that si0 is readable
541 readers = ss.remote_get_buckets("si0")
542 self.failUnlessEqual(len(readers), 5)
544 # renew the first lease. Only the proper renew_secret should work
545 ss.remote_renew_lease("si0", rs0)
546 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
547 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
549 # check that si0 is still readable
550 readers = ss.remote_get_buckets("si0")
551 self.failUnlessEqual(len(readers), 5)
554 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
555 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
556 ss.remote_cancel_lease("si0", cs0)
558 # si0 should now be gone
559 readers = ss.remote_get_buckets("si0")
560 self.failUnlessEqual(len(readers), 0)
561 # and the renew should no longer work
562 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
565 # cancel the first lease on si1, leaving the second and third in place
566 ss.remote_cancel_lease("si1", cs1)
567 readers = ss.remote_get_buckets("si1")
568 self.failUnlessEqual(len(readers), 5)
569 # the corresponding renew should no longer work
570 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
572 leases = list(ss.get_leases("si1"))
573 self.failUnlessEqual(len(leases), 2)
574 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
576 ss.remote_renew_lease("si1", rs2)
577 # cancelling the second and third should make it go away
578 ss.remote_cancel_lease("si1", cs2)
579 ss.remote_cancel_lease("si1", cs2a)
580 readers = ss.remote_get_buckets("si1")
581 self.failUnlessEqual(len(readers), 0)
582 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
583 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
584 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
586 leases = list(ss.get_leases("si1"))
587 self.failUnlessEqual(len(leases), 0)
590 # test overlapping uploads
591 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
592 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
593 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
594 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
595 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
596 sharenums, size, canary)
597 self.failUnlessEqual(len(already), 0)
598 self.failUnlessEqual(len(writers), 5)
599 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
600 sharenums, size, canary)
601 self.failUnlessEqual(len(already2), 0)
602 self.failUnlessEqual(len(writers2), 0)
603 for wb in writers.values():
606 leases = list(ss.get_leases("si3"))
607 self.failUnlessEqual(len(leases), 1)
609 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
610 sharenums, size, canary)
611 self.failUnlessEqual(len(already3), 5)
612 self.failUnlessEqual(len(writers3), 0)
614 leases = list(ss.get_leases("si3"))
615 self.failUnlessEqual(len(leases), 2)
617 def test_readonly(self):
618 workdir = self.workdir("test_readonly")
619 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
620 ss.setServiceParent(self.sparent)
622 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
623 self.failUnlessEqual(already, set())
624 self.failUnlessEqual(writers, {})
626 stats = ss.get_stats()
627 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
629 if "storage_server.disk_avail" in stats:
630 # windows does not have os.statvfs, so it doesn't give us disk
631 # stats. But if there are stats, readonly_storage means
633 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
635 def test_discard(self):
636 # discard is really only used for other tests, but we test it anyways
637 workdir = self.workdir("test_discard")
638 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
639 ss.setServiceParent(self.sparent)
641 canary = FakeCanary()
642 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
643 self.failUnlessEqual(already, set())
644 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
645 for i,wb in writers.items():
646 wb.remote_write(0, "%25d" % i)
648 # since we discard the data, the shares should be present but sparse.
649 # Since we write with some seeks, the data we read back will be all
651 b = ss.remote_get_buckets("vid")
652 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
653 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
655 def test_advise_corruption(self):
656 workdir = self.workdir("test_advise_corruption")
657 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
658 ss.setServiceParent(self.sparent)
660 si0_s = base32.b2a("si0")
661 ss.remote_advise_corrupt_share("immutable", "si0", 0,
662 "This share smells funny.\n")
663 reportdir = os.path.join(workdir, "corruption-advisories")
664 reports = os.listdir(reportdir)
665 self.failUnlessEqual(len(reports), 1)
666 report_si0 = reports[0]
667 self.failUnless(si0_s in report_si0, report_si0)
668 f = open(os.path.join(reportdir, report_si0), "r")
671 self.failUnless("type: immutable" in report)
672 self.failUnless(("storage_index: %s" % si0_s) in report)
673 self.failUnless("share_number: 0" in report)
674 self.failUnless("This share smells funny." in report)
676 # test the RIBucketWriter version too
677 si1_s = base32.b2a("si1")
678 already,writers = self.allocate(ss, "si1", [1], 75)
679 self.failUnlessEqual(already, set())
680 self.failUnlessEqual(set(writers.keys()), set([1]))
681 writers[1].remote_write(0, "data")
682 writers[1].remote_close()
684 b = ss.remote_get_buckets("si1")
685 self.failUnlessEqual(set(b.keys()), set([1]))
686 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
688 reports = os.listdir(reportdir)
689 self.failUnlessEqual(len(reports), 2)
690 report_si1 = [r for r in reports if si1_s in r][0]
691 f = open(os.path.join(reportdir, report_si1), "r")
694 self.failUnless("type: immutable" in report)
695 self.failUnless(("storage_index: %s" % si1_s) in report)
696 self.failUnless("share_number: 1" in report)
697 self.failUnless("This share tastes like dust." in report)
701 class MutableServer(unittest.TestCase):
704 self.sparent = LoggingServiceParent()
705 self._lease_secret = itertools.count()
707 return self.sparent.stopService()
709 def workdir(self, name):
710 basedir = os.path.join("storage", "MutableServer", name)
713 def create(self, name):
714 workdir = self.workdir(name)
715 ss = StorageServer(workdir, "\x00" * 20)
716 ss.setServiceParent(self.sparent)
719 def test_create(self):
720 ss = self.create("test_create")
722 def write_enabler(self, we_tag):
723 return hashutil.tagged_hash("we_blah", we_tag)
725 def renew_secret(self, tag):
726 return hashutil.tagged_hash("renew_blah", str(tag))
728 def cancel_secret(self, tag):
729 return hashutil.tagged_hash("cancel_blah", str(tag))
731 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
732 write_enabler = self.write_enabler(we_tag)
733 renew_secret = self.renew_secret(lease_tag)
734 cancel_secret = self.cancel_secret(lease_tag)
735 rstaraw = ss.remote_slot_testv_and_readv_and_writev
736 testandwritev = dict( [ (shnum, ([], [], None) )
737 for shnum in sharenums ] )
739 rc = rstaraw(storage_index,
740 (write_enabler, renew_secret, cancel_secret),
743 (did_write, readv_data) = rc
744 self.failUnless(did_write)
745 self.failUnless(isinstance(readv_data, dict))
746 self.failUnlessEqual(len(readv_data), 0)
748 def test_bad_magic(self):
749 ss = self.create("test_bad_magic")
750 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
751 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
756 read = ss.remote_slot_readv
757 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
758 read, "si1", [0], [(0,10)])
759 self.failUnless(" had magic " in str(e), e)
760 self.failUnless(" but we wanted " in str(e), e)
762 def test_container_size(self):
763 ss = self.create("test_container_size")
764 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
766 read = ss.remote_slot_readv
767 rstaraw = ss.remote_slot_testv_and_readv_and_writev
768 secrets = ( self.write_enabler("we1"),
769 self.renew_secret("we1"),
770 self.cancel_secret("we1") )
771 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
772 answer = rstaraw("si1", secrets,
773 {0: ([], [(0,data)], len(data)+12)},
775 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
777 # trying to make the container too large will raise an exception
778 TOOBIG = MutableShareFile.MAX_SIZE + 10
779 self.failUnlessRaises(DataTooLargeError,
780 rstaraw, "si1", secrets,
781 {0: ([], [(0,data)], TOOBIG)},
784 # it should be possible to make the container smaller, although at
785 # the moment this doesn't actually affect the share, unless the
786 # container size is dropped to zero, in which case the share is
788 answer = rstaraw("si1", secrets,
789 {0: ([], [(0,data)], len(data)+8)},
791 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
793 answer = rstaraw("si1", secrets,
794 {0: ([], [(0,data)], 0)},
796 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
798 read_answer = read("si1", [0], [(0,10)])
799 self.failUnlessEqual(read_answer, {})
801 def test_allocate(self):
802 ss = self.create("test_allocate")
803 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
806 read = ss.remote_slot_readv
807 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
809 self.failUnlessEqual(read("si1", [], [(0, 10)]),
810 {0: [""], 1: [""], 2: [""]})
811 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
815 secrets = ( self.write_enabler("we1"),
816 self.renew_secret("we1"),
817 self.cancel_secret("we1") )
818 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
819 write = ss.remote_slot_testv_and_readv_and_writev
820 answer = write("si1", secrets,
821 {0: ([], [(0,data)], None)},
823 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
825 self.failUnlessEqual(read("si1", [0], [(0,20)]),
826 {0: ["00000000001111111111"]})
827 self.failUnlessEqual(read("si1", [0], [(95,10)]),
829 #self.failUnlessEqual(s0.remote_get_length(), 100)
831 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
832 f = self.failUnlessRaises(BadWriteEnablerError,
833 write, "si1", bad_secrets,
835 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
837 # this testv should fail
838 answer = write("si1", secrets,
839 {0: ([(0, 12, "eq", "444444444444"),
840 (20, 5, "eq", "22222"),
847 self.failUnlessEqual(answer, (False,
848 {0: ["000000000011", "22222"],
852 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
855 answer = write("si1", secrets,
856 {0: ([(10, 5, "lt", "11111"),
863 self.failUnlessEqual(answer, (False,
868 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
871 def test_operators(self):
872 # test operators, the data we're comparing is '11111' in all cases.
873 # test both fail+pass, reset data after each one.
874 ss = self.create("test_operators")
876 secrets = ( self.write_enabler("we1"),
877 self.renew_secret("we1"),
878 self.cancel_secret("we1") )
879 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
880 write = ss.remote_slot_testv_and_readv_and_writev
881 read = ss.remote_slot_readv
884 write("si1", secrets,
885 {0: ([], [(0,data)], None)},
891 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
896 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
897 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
898 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
901 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
906 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
907 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
910 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
915 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
916 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
920 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
925 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
926 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
929 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
934 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
935 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
938 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
943 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
944 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
948 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
953 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
954 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
957 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
962 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
963 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
967 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
972 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
973 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
976 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
981 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
982 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
986 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
991 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
992 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
995 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1000 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1001 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1004 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1009 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1010 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1014 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1019 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1020 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1023 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1028 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1029 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1032 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1037 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1038 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1041 # finally, test some operators against empty shares
1042 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1047 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1048 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1051 def test_readv(self):
1052 ss = self.create("test_readv")
1053 secrets = ( self.write_enabler("we1"),
1054 self.renew_secret("we1"),
1055 self.cancel_secret("we1") )
1056 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1057 write = ss.remote_slot_testv_and_readv_and_writev
1058 read = ss.remote_slot_readv
1059 data = [("%d" % i) * 100 for i in range(3)]
1060 rc = write("si1", secrets,
1061 {0: ([], [(0,data[0])], None),
1062 1: ([], [(0,data[1])], None),
1063 2: ([], [(0,data[2])], None),
1065 self.failUnlessEqual(rc, (True, {}))
1067 answer = read("si1", [], [(0, 10)])
1068 self.failUnlessEqual(answer, {0: ["0"*10],
1072 def compare_leases_without_timestamps(self, leases_a, leases_b):
1073 self.failUnlessEqual(len(leases_a), len(leases_b))
1074 for i in range(len(leases_a)):
1077 self.failUnlessEqual(a.owner_num, b.owner_num)
1078 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1079 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1080 self.failUnlessEqual(a.nodeid, b.nodeid)
1082 def compare_leases(self, leases_a, leases_b):
1083 self.failUnlessEqual(len(leases_a), len(leases_b))
1084 for i in range(len(leases_a)):
1087 self.failUnlessEqual(a.owner_num, b.owner_num)
1088 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1089 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1090 self.failUnlessEqual(a.nodeid, b.nodeid)
1091 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1093 def test_leases(self):
1094 ss = self.create("test_leases")
1096 return ( self.write_enabler("we1"),
1097 self.renew_secret("we1-%d" % n),
1098 self.cancel_secret("we1-%d" % n) )
1099 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1100 write = ss.remote_slot_testv_and_readv_and_writev
1101 read = ss.remote_slot_readv
1102 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1103 self.failUnlessEqual(rc, (True, {}))
1105 # create a random non-numeric file in the bucket directory, to
1106 # exercise the code that's supposed to ignore those.
1107 bucket_dir = os.path.join(self.workdir("test_leases"),
1108 "shares", storage_index_to_dir("si1"))
1109 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1110 f.write("you ought to be ignoring me\n")
1113 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1114 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1116 # add-lease on a missing storage index is silently ignored
1117 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1119 # re-allocate the slots and use the same secrets, that should update
1121 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1122 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1125 ss.remote_renew_lease("si1", secrets(0)[1])
1126 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1128 # now allocate them with a bunch of different secrets, to trigger the
1129 # extended lease code. Use add_lease for one of them.
1130 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1131 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1132 secrets2 = secrets(2)
1133 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1134 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1135 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1136 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1137 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1139 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1141 # cancel one of them
1142 ss.remote_cancel_lease("si1", secrets(5)[2])
1143 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1145 all_leases = list(s0.get_leases())
1146 # and write enough data to expand the container, forcing the server
1147 # to move the leases
1148 write("si1", secrets(0),
1149 {0: ([], [(0,data)], 200), },
1152 # read back the leases, make sure they're still intact.
1153 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1155 ss.remote_renew_lease("si1", secrets(0)[1])
1156 ss.remote_renew_lease("si1", secrets(1)[1])
1157 ss.remote_renew_lease("si1", secrets(2)[1])
1158 ss.remote_renew_lease("si1", secrets(3)[1])
1159 ss.remote_renew_lease("si1", secrets(4)[1])
1160 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1161 # get a new copy of the leases, with the current timestamps. Reading
1162 # data and failing to renew/cancel leases should leave the timestamps
1164 all_leases = list(s0.get_leases())
1165 # renewing with a bogus token should prompt an error message
1167 # examine the exception thus raised, make sure the old nodeid is
1168 # present, to provide for share migration
1169 e = self.failUnlessRaises(IndexError,
1170 ss.remote_renew_lease, "si1",
1173 self.failUnless("Unable to renew non-existent lease" in e_s)
1174 self.failUnless("I have leases accepted by nodeids:" in e_s)
1175 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1177 # same for cancelling
1178 self.failUnlessRaises(IndexError,
1179 ss.remote_cancel_lease, "si1",
1181 self.compare_leases(all_leases, list(s0.get_leases()))
1183 # reading shares should not modify the timestamp
1184 read("si1", [], [(0,200)])
1185 self.compare_leases(all_leases, list(s0.get_leases()))
1187 write("si1", secrets(0),
1188 {0: ([], [(200, "make me bigger")], None)}, [])
1189 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1191 write("si1", secrets(0),
1192 {0: ([], [(500, "make me really bigger")], None)}, [])
1193 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1195 # now cancel them all
1196 ss.remote_cancel_lease("si1", secrets(0)[2])
1197 ss.remote_cancel_lease("si1", secrets(1)[2])
1198 ss.remote_cancel_lease("si1", secrets(2)[2])
1199 ss.remote_cancel_lease("si1", secrets(3)[2])
1201 # the slot should still be there
1202 remaining_shares = read("si1", [], [(0,10)])
1203 self.failUnlessEqual(len(remaining_shares), 1)
1204 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1206 # cancelling a non-existent lease should raise an IndexError
1207 self.failUnlessRaises(IndexError,
1208 ss.remote_cancel_lease, "si1", "nonsecret")
1210 # and the slot should still be there
1211 remaining_shares = read("si1", [], [(0,10)])
1212 self.failUnlessEqual(len(remaining_shares), 1)
1213 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1215 ss.remote_cancel_lease("si1", secrets(4)[2])
1216 # now the slot should be gone
1217 no_shares = read("si1", [], [(0,10)])
1218 self.failUnlessEqual(no_shares, {})
1220 # cancelling a lease on a non-existent share should raise an IndexError
1221 self.failUnlessRaises(IndexError,
1222 ss.remote_cancel_lease, "si2", "nonsecret")
1224 def test_remove(self):
1225 ss = self.create("test_remove")
1226 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1228 readv = ss.remote_slot_readv
1229 writev = ss.remote_slot_testv_and_readv_and_writev
1230 secrets = ( self.write_enabler("we1"),
1231 self.renew_secret("we1"),
1232 self.cancel_secret("we1") )
1233 # delete sh0 by setting its size to zero
1234 answer = writev("si1", secrets,
1237 # the answer should mention all the shares that existed before the
1239 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1240 # but a new read should show only sh1 and sh2
1241 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1244 # delete sh1 by setting its size to zero
1245 answer = writev("si1", secrets,
1248 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1249 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1252 # delete sh2 by setting its size to zero
1253 answer = writev("si1", secrets,
1256 self.failUnlessEqual(answer, (True, {2:[]}) )
1257 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1259 # and the bucket directory should now be gone
1260 si = base32.b2a("si1")
1261 # note: this is a detail of the storage server implementation, and
1262 # may change in the future
1264 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1265 bucketdir = os.path.join(prefixdir, si)
1266 self.failUnless(os.path.exists(prefixdir))
1267 self.failIf(os.path.exists(bucketdir))
1269 class Stats(unittest.TestCase):
1272 self.sparent = LoggingServiceParent()
1273 self._lease_secret = itertools.count()
1275 return self.sparent.stopService()
1277 def workdir(self, name):
1278 basedir = os.path.join("storage", "Server", name)
1281 def create(self, name):
1282 workdir = self.workdir(name)
1283 ss = StorageServer(workdir, "\x00" * 20)
1284 ss.setServiceParent(self.sparent)
1287 def test_latencies(self):
1288 ss = self.create("test_latencies")
1289 for i in range(10000):
1290 ss.add_latency("allocate", 1.0 * i)
1291 for i in range(1000):
1292 ss.add_latency("renew", 1.0 * i)
1294 ss.add_latency("cancel", 2.0 * i)
1295 ss.add_latency("get", 5.0)
1297 output = ss.get_latencies()
1299 self.failUnlessEqual(sorted(output.keys()),
1300 sorted(["allocate", "renew", "cancel", "get"]))
1301 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1302 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1303 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1304 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1305 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1306 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1307 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1308 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1309 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1311 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1312 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1313 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1314 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1315 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1316 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1317 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1318 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1319 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1321 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1322 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1323 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1324 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1325 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1326 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1327 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1328 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1329 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1331 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1332 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1333 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1334 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1335 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1336 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1337 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1338 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1339 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1342 s = re.sub(r'<[^>]*>', ' ', s)
1343 s = re.sub(r'\s+', ' ', s)
1346 class MyBucketCountingCrawler(BucketCountingCrawler):
1347 def finished_prefix(self, cycle, prefix):
1348 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1350 d = self.hook_ds.pop(0)
1353 class MyStorageServer(StorageServer):
1354 def add_bucket_counter(self):
1355 statefile = os.path.join(self.storedir, "bucket_counter.state")
1356 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1357 self.bucket_counter.setServiceParent(self)
1359 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1362 self.s = service.MultiService()
1363 self.s.startService()
1365 return self.s.stopService()
1367 def test_bucket_counter(self):
1368 basedir = "storage/BucketCounter/bucket_counter"
1369 fileutil.make_dirs(basedir)
1370 ss = StorageServer(basedir, "\x00" * 20)
1371 # to make sure we capture the bucket-counting-crawler in the middle
1372 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1373 # also make it start sooner than usual.
1374 ss.bucket_counter.slow_start = 0
1375 orig_cpu_slice = ss.bucket_counter.cpu_slice
1376 ss.bucket_counter.cpu_slice = 0
1377 ss.setServiceParent(self.s)
1379 w = StorageStatus(ss)
1381 # this sample is before the crawler has started doing anything
1382 html = w.renderSynchronously()
1383 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1384 s = remove_tags(html)
1385 self.failUnless("Accepting new shares: Yes" in s, s)
1386 self.failUnless("Reserved space: - 0 B (0)" in s, s)
1387 self.failUnless("Total buckets: Not computed yet" in s, s)
1388 self.failUnless("Next crawl in" in s, s)
1390 # give the bucket-counting-crawler one tick to get started. The
1391 # cpu_slice=0 will force it to yield right after it processes the
1394 d = fireEventually()
1395 def _check(ignored):
1396 # are we really right after the first prefix?
1397 state = ss.bucket_counter.get_state()
1398 self.failUnlessEqual(state["last-complete-prefix"],
1399 ss.bucket_counter.prefixes[0])
1400 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1401 html = w.renderSynchronously()
1402 s = remove_tags(html)
1403 self.failUnless(" Current crawl " in s, s)
1404 self.failUnless(" (next work in " in s, s)
1405 d.addCallback(_check)
1407 # now give it enough time to complete a full cycle
1409 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1410 d.addCallback(lambda ignored: self.poll(_watch))
1411 def _check2(ignored):
1412 ss.bucket_counter.cpu_slice = orig_cpu_slice
1413 html = w.renderSynchronously()
1414 s = remove_tags(html)
1415 self.failUnless("Total buckets: 0 (the number of" in s, s)
1416 self.failUnless("Next crawl in 59 minutes" in s, s)
1417 d.addCallback(_check2)
1420 def test_bucket_counter_cleanup(self):
1421 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1422 fileutil.make_dirs(basedir)
1423 ss = StorageServer(basedir, "\x00" * 20)
1424 # to make sure we capture the bucket-counting-crawler in the middle
1425 # of a cycle, we reach in and reduce its maximum slice time to 0.
1426 ss.bucket_counter.slow_start = 0
1427 orig_cpu_slice = ss.bucket_counter.cpu_slice
1428 ss.bucket_counter.cpu_slice = 0
1429 ss.setServiceParent(self.s)
1431 d = fireEventually()
1433 def _after_first_prefix(ignored):
1434 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1435 # now sneak in and mess with its state, to make sure it cleans up
1436 # properly at the end of the cycle
1437 state = ss.bucket_counter.state
1438 self.failUnlessEqual(state["last-complete-prefix"],
1439 ss.bucket_counter.prefixes[0])
1440 state["bucket-counts"][-12] = {}
1441 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1442 ss.bucket_counter.save_state()
1443 d.addCallback(_after_first_prefix)
1445 # now give it enough time to complete a cycle
1447 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1448 d.addCallback(lambda ignored: self.poll(_watch))
1449 def _check2(ignored):
1450 ss.bucket_counter.cpu_slice = orig_cpu_slice
1451 s = ss.bucket_counter.get_state()
1452 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1453 self.failIf("bogusprefix!" in s["storage-index-samples"],
1454 s["storage-index-samples"].keys())
1455 d.addCallback(_check2)
1458 def test_bucket_counter_eta(self):
1459 basedir = "storage/BucketCounter/bucket_counter_eta"
1460 fileutil.make_dirs(basedir)
1461 ss = MyStorageServer(basedir, "\x00" * 20)
1462 ss.bucket_counter.slow_start = 0
1463 # these will be fired inside finished_prefix()
1464 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1465 w = StorageStatus(ss)
1467 d = defer.Deferred()
1469 def _check_1(ignored):
1470 # no ETA is available yet
1471 html = w.renderSynchronously()
1472 s = remove_tags(html)
1473 self.failUnlessIn("complete (next work", s)
1475 def _check_2(ignored):
1476 # one prefix has finished, so an ETA based upon that elapsed time
1477 # should be available.
1478 html = w.renderSynchronously()
1479 s = remove_tags(html)
1480 self.failUnlessIn("complete (ETA ", s)
1482 def _check_3(ignored):
1483 # two prefixes have finished
1484 html = w.renderSynchronously()
1485 s = remove_tags(html)
1486 self.failUnlessIn("complete (ETA ", s)
1489 hooks[0].addCallback(_check_1).addErrback(d.errback)
1490 hooks[1].addCallback(_check_2).addErrback(d.errback)
1491 hooks[2].addCallback(_check_3).addErrback(d.errback)
1493 ss.setServiceParent(self.s)
1496 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1497 stop_after_first_bucket = False
1498 def process_bucket(self, *args, **kwargs):
1499 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1500 if self.stop_after_first_bucket:
1501 self.stop_after_first_bucket = False
1502 self.cpu_slice = -1.0
1503 def yielding(self, sleep_time):
1504 if not self.stop_after_first_bucket:
1505 self.cpu_slice = 500
1507 class BrokenStatResults:
1509 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1512 bsr = BrokenStatResults()
1513 for attrname in dir(s):
1514 if attrname.startswith("_"):
1516 if attrname == "st_blocks":
1518 setattr(bsr, attrname, getattr(s, attrname))
1521 class InstrumentedStorageServer(StorageServer):
1522 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1523 class No_ST_BLOCKS_StorageServer(StorageServer):
1524 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1526 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1529 self.s = service.MultiService()
1530 self.s.startService()
1532 return self.s.stopService()
1534 def make_shares(self, ss):
1536 return (si, hashutil.tagged_hash("renew", si),
1537 hashutil.tagged_hash("cancel", si))
1538 def make_mutable(si):
1539 return (si, hashutil.tagged_hash("renew", si),
1540 hashutil.tagged_hash("cancel", si),
1541 hashutil.tagged_hash("write-enabler", si))
1542 def make_extra_lease(si, num):
1543 return (hashutil.tagged_hash("renew-%d" % num, si),
1544 hashutil.tagged_hash("cancel-%d" % num, si))
1546 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1547 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1548 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1549 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1550 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1551 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1553 canary = FakeCanary()
1554 # note: 'tahoe debug dump-share' will not handle this file, since the
1555 # inner contents are not a valid CHK share
1556 data = "\xff" * 1000
1558 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1560 w[0].remote_write(0, data)
1563 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1565 w[0].remote_write(0, data)
1567 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1569 writev = ss.remote_slot_testv_and_readv_and_writev
1570 writev(mutable_si_2, (we2, rs2, cs2),
1571 {0: ([], [(0,data)], len(data))}, [])
1572 writev(mutable_si_3, (we3, rs3, cs3),
1573 {0: ([], [(0,data)], len(data))}, [])
1574 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1576 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1577 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1578 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1580 def test_basic(self):
1581 basedir = "storage/LeaseCrawler/basic"
1582 fileutil.make_dirs(basedir)
1583 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1584 # make it start sooner than usual.
1585 lc = ss.lease_checker
1588 lc.stop_after_first_bucket = True
1589 webstatus = StorageStatus(ss)
1591 # create a few shares, with some leases on them
1592 self.make_shares(ss)
1593 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1595 # add a non-sharefile to exercise another code path
1596 fn = os.path.join(ss.sharedir,
1597 storage_index_to_dir(immutable_si_0),
1600 f.write("I am not a share.\n")
1603 # this is before the crawl has started, so we're not in a cycle yet
1604 initial_state = lc.get_state()
1605 self.failIf(lc.get_progress()["cycle-in-progress"])
1606 self.failIf("cycle-to-date" in initial_state)
1607 self.failIf("estimated-remaining-cycle" in initial_state)
1608 self.failIf("estimated-current-cycle" in initial_state)
1609 self.failUnless("history" in initial_state)
1610 self.failUnlessEqual(initial_state["history"], {})
1612 ss.setServiceParent(self.s)
1616 d = fireEventually()
1618 # now examine the state right after the first bucket has been
1620 def _after_first_bucket(ignored):
1621 initial_state = lc.get_state()
1622 self.failUnless("cycle-to-date" in initial_state)
1623 self.failUnless("estimated-remaining-cycle" in initial_state)
1624 self.failUnless("estimated-current-cycle" in initial_state)
1625 self.failUnless("history" in initial_state)
1626 self.failUnlessEqual(initial_state["history"], {})
1628 so_far = initial_state["cycle-to-date"]
1629 self.failUnlessEqual(so_far["expiration-enabled"], False)
1630 self.failUnless("configured-expiration-mode" in so_far)
1631 self.failUnless("lease-age-histogram" in so_far)
1632 lah = so_far["lease-age-histogram"]
1633 self.failUnlessEqual(type(lah), list)
1634 self.failUnlessEqual(len(lah), 1)
1635 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1636 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1637 self.failUnlessEqual(so_far["corrupt-shares"], [])
1638 sr1 = so_far["space-recovered"]
1639 self.failUnlessEqual(sr1["examined-buckets"], 1)
1640 self.failUnlessEqual(sr1["examined-shares"], 1)
1641 self.failUnlessEqual(sr1["actual-shares"], 0)
1642 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1643 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1644 left = initial_state["estimated-remaining-cycle"]
1645 sr2 = left["space-recovered"]
1646 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1647 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1648 self.failIfEqual(sr2["actual-shares"], None)
1649 self.failIfEqual(sr2["configured-diskbytes"], None)
1650 self.failIfEqual(sr2["original-sharebytes"], None)
1651 d.addCallback(_after_first_bucket)
1652 d.addCallback(lambda ign: self.render1(webstatus))
1653 def _check_html_in_cycle(html):
1654 s = remove_tags(html)
1655 self.failUnlessIn("So far, this cycle has examined "
1656 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1657 self.failUnlessIn("and has recovered: "
1658 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1659 "0 B (0 B / 0 B)", s)
1660 self.failUnlessIn("If expiration were enabled, "
1661 "we would have recovered: "
1662 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1663 " 0 B (0 B / 0 B) by now", s)
1664 self.failUnlessIn("and the remainder of this cycle "
1665 "would probably recover: "
1666 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1667 " 0 B (0 B / 0 B)", s)
1668 self.failUnlessIn("and the whole cycle would probably recover: "
1669 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1670 " 0 B (0 B / 0 B)", s)
1671 self.failUnlessIn("if we were strictly using each lease's default "
1672 "31-day lease lifetime", s)
1673 self.failUnlessIn("this cycle would be expected to recover: ", s)
1674 d.addCallback(_check_html_in_cycle)
1676 # wait for the crawler to finish the first cycle. Nothing should have
1679 return bool(lc.get_state()["last-cycle-finished"] is not None)
1680 d.addCallback(lambda ign: self.poll(_wait))
1682 def _after_first_cycle(ignored):
1684 self.failIf("cycle-to-date" in s)
1685 self.failIf("estimated-remaining-cycle" in s)
1686 self.failIf("estimated-current-cycle" in s)
1687 last = s["history"][0]
1688 self.failUnless("cycle-start-finish-times" in last)
1689 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1690 self.failUnlessEqual(last["expiration-enabled"], False)
1691 self.failUnless("configured-expiration-mode" in last)
1693 self.failUnless("lease-age-histogram" in last)
1694 lah = last["lease-age-histogram"]
1695 self.failUnlessEqual(type(lah), list)
1696 self.failUnlessEqual(len(lah), 1)
1697 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1699 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1700 self.failUnlessEqual(last["corrupt-shares"], [])
1702 rec = last["space-recovered"]
1703 self.failUnlessEqual(rec["examined-buckets"], 4)
1704 self.failUnlessEqual(rec["examined-shares"], 4)
1705 self.failUnlessEqual(rec["actual-buckets"], 0)
1706 self.failUnlessEqual(rec["original-buckets"], 0)
1707 self.failUnlessEqual(rec["configured-buckets"], 0)
1708 self.failUnlessEqual(rec["actual-shares"], 0)
1709 self.failUnlessEqual(rec["original-shares"], 0)
1710 self.failUnlessEqual(rec["configured-shares"], 0)
1711 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1712 self.failUnlessEqual(rec["original-diskbytes"], 0)
1713 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1714 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1715 self.failUnlessEqual(rec["original-sharebytes"], 0)
1716 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1718 def _get_sharefile(si):
1719 return list(ss._iter_share_files(si))[0]
1720 def count_leases(si):
1721 return len(list(_get_sharefile(si).get_leases()))
1722 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1723 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1724 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1725 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1726 d.addCallback(_after_first_cycle)
1727 d.addCallback(lambda ign: self.render1(webstatus))
1728 def _check_html(html):
1729 s = remove_tags(html)
1730 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1731 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
1732 "but expiration was not enabled", s)
1733 d.addCallback(_check_html)
1734 d.addCallback(lambda ign: self.render_json(webstatus))
1735 def _check_json(json):
1736 data = simplejson.loads(json)
1737 self.failUnless("lease-checker" in data)
1738 self.failUnless("lease-checker-progress" in data)
1739 d.addCallback(_check_json)
1742 def backdate_lease(self, sf, renew_secret, new_expire_time):
1743 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1744 # "renew" a lease with a new_expire_time that is older than what the
1745 # current lease has), so we have to reach inside it.
1746 for i,lease in enumerate(sf.get_leases()):
1747 if lease.renew_secret == renew_secret:
1748 lease.expiration_time = new_expire_time
1749 f = open(sf.home, 'rb+')
1750 sf._write_lease_record(f, i, lease)
1753 raise IndexError("unable to renew non-existent lease")
1755 def test_expire_age(self):
1756 basedir = "storage/LeaseCrawler/expire_age"
1757 fileutil.make_dirs(basedir)
1758 # setting expiration_time to 2000 means that any lease which is more
1759 # than 2000s old will be expired.
1760 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1761 expiration_enabled=True,
1762 expiration_mode="age",
1763 expiration_override_lease_duration=2000)
1764 # make it start sooner than usual.
1765 lc = ss.lease_checker
1767 lc.stop_after_first_bucket = True
1768 webstatus = StorageStatus(ss)
1770 # create a few shares, with some leases on them
1771 self.make_shares(ss)
1772 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1774 def count_shares(si):
1775 return len(list(ss._iter_share_files(si)))
1776 def _get_sharefile(si):
1777 return list(ss._iter_share_files(si))[0]
1778 def count_leases(si):
1779 return len(list(_get_sharefile(si).get_leases()))
1781 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1782 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1783 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1784 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1785 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1786 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1787 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1788 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1790 # artificially crank back the expiration time on the first lease of
1791 # each share, to make it look like it expired already (age=1000s).
1792 # Some shares have an extra lease which is set to expire at the
1793 # default time in 31 days from now (age=31days). We then run the
1794 # crawler, which will expire the first lease, making some shares get
1795 # deleted and others stay alive (with one remaining lease)
1798 sf0 = _get_sharefile(immutable_si_0)
1799 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1800 sf0_size = os.stat(sf0.home).st_size
1802 # immutable_si_1 gets an extra lease
1803 sf1 = _get_sharefile(immutable_si_1)
1804 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1806 sf2 = _get_sharefile(mutable_si_2)
1807 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1808 sf2_size = os.stat(sf2.home).st_size
1810 # mutable_si_3 gets an extra lease
1811 sf3 = _get_sharefile(mutable_si_3)
1812 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1814 ss.setServiceParent(self.s)
1816 d = fireEventually()
1817 # examine the state right after the first bucket has been processed
1818 def _after_first_bucket(ignored):
1819 p = lc.get_progress()
1820 self.failUnless(p["cycle-in-progress"])
1821 d.addCallback(_after_first_bucket)
1822 d.addCallback(lambda ign: self.render1(webstatus))
1823 def _check_html_in_cycle(html):
1824 s = remove_tags(html)
1825 # the first bucket encountered gets deleted, and its prefix
1826 # happens to be about 1/5th of the way through the ring, so the
1827 # predictor thinks we'll have 5 shares and that we'll delete them
1828 # all. This part of the test depends upon the SIs landing right
1829 # where they do now.
1830 self.failUnlessIn("The remainder of this cycle is expected to "
1831 "recover: 4 shares, 4 buckets", s)
1832 self.failUnlessIn("The whole cycle is expected to examine "
1833 "5 shares in 5 buckets and to recover: "
1834 "5 shares, 5 buckets", s)
1835 d.addCallback(_check_html_in_cycle)
1837 # wait for the crawler to finish the first cycle. Two shares should
1840 return bool(lc.get_state()["last-cycle-finished"] is not None)
1841 d.addCallback(lambda ign: self.poll(_wait))
1843 def _after_first_cycle(ignored):
1844 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1845 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1846 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1847 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1848 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1849 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1852 last = s["history"][0]
1854 self.failUnlessEqual(last["expiration-enabled"], True)
1855 self.failUnlessEqual(last["configured-expiration-mode"],
1856 ("age", 2000, None, ("mutable", "immutable")))
1857 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1859 rec = last["space-recovered"]
1860 self.failUnlessEqual(rec["examined-buckets"], 4)
1861 self.failUnlessEqual(rec["examined-shares"], 4)
1862 self.failUnlessEqual(rec["actual-buckets"], 2)
1863 self.failUnlessEqual(rec["original-buckets"], 2)
1864 self.failUnlessEqual(rec["configured-buckets"], 2)
1865 self.failUnlessEqual(rec["actual-shares"], 2)
1866 self.failUnlessEqual(rec["original-shares"], 2)
1867 self.failUnlessEqual(rec["configured-shares"], 2)
1868 size = sf0_size + sf2_size
1869 self.failUnlessEqual(rec["actual-sharebytes"], size)
1870 self.failUnlessEqual(rec["original-sharebytes"], size)
1871 self.failUnlessEqual(rec["configured-sharebytes"], size)
1872 # different platforms have different notions of "blocks used by
1873 # this file", so merely assert that it's a number
1874 self.failUnless(rec["actual-diskbytes"] >= 0,
1875 rec["actual-diskbytes"])
1876 self.failUnless(rec["original-diskbytes"] >= 0,
1877 rec["original-diskbytes"])
1878 self.failUnless(rec["configured-diskbytes"] >= 0,
1879 rec["configured-diskbytes"])
1880 d.addCallback(_after_first_cycle)
1881 d.addCallback(lambda ign: self.render1(webstatus))
1882 def _check_html(html):
1883 s = remove_tags(html)
1884 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1885 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1886 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1887 d.addCallback(_check_html)
1890 def test_expire_cutoff_date(self):
1891 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1892 fileutil.make_dirs(basedir)
1893 # setting cutoff-date to 2000 seconds ago means that any lease which
1894 # is more than 2000s old will be expired.
1896 then = int(now - 2000)
1897 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1898 expiration_enabled=True,
1899 expiration_mode="cutoff-date",
1900 expiration_cutoff_date=then)
1901 # make it start sooner than usual.
1902 lc = ss.lease_checker
1904 lc.stop_after_first_bucket = True
1905 webstatus = StorageStatus(ss)
1907 # create a few shares, with some leases on them
1908 self.make_shares(ss)
1909 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1911 def count_shares(si):
1912 return len(list(ss._iter_share_files(si)))
1913 def _get_sharefile(si):
1914 return list(ss._iter_share_files(si))[0]
1915 def count_leases(si):
1916 return len(list(_get_sharefile(si).get_leases()))
1918 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1919 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1920 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1921 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1922 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1923 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1924 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1925 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1927 # artificially crank back the expiration time on the first lease of
1928 # each share, to make it look like was renewed 3000s ago. To achieve
1929 # this, we need to set the expiration time to now-3000+31days. This
1930 # will change when the lease format is improved to contain both
1931 # create/renew time and duration.
1932 new_expiration_time = now - 3000 + 31*24*60*60
1934 # Some shares have an extra lease which is set to expire at the
1935 # default time in 31 days from now (age=31days). We then run the
1936 # crawler, which will expire the first lease, making some shares get
1937 # deleted and others stay alive (with one remaining lease)
1939 sf0 = _get_sharefile(immutable_si_0)
1940 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1941 sf0_size = os.stat(sf0.home).st_size
1943 # immutable_si_1 gets an extra lease
1944 sf1 = _get_sharefile(immutable_si_1)
1945 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1947 sf2 = _get_sharefile(mutable_si_2)
1948 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1949 sf2_size = os.stat(sf2.home).st_size
1951 # mutable_si_3 gets an extra lease
1952 sf3 = _get_sharefile(mutable_si_3)
1953 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1955 ss.setServiceParent(self.s)
1957 d = fireEventually()
1958 # examine the state right after the first bucket has been processed
1959 def _after_first_bucket(ignored):
1960 p = lc.get_progress()
1961 self.failUnless(p["cycle-in-progress"])
1962 d.addCallback(_after_first_bucket)
1963 d.addCallback(lambda ign: self.render1(webstatus))
1964 def _check_html_in_cycle(html):
1965 s = remove_tags(html)
1966 # the first bucket encountered gets deleted, and its prefix
1967 # happens to be about 1/5th of the way through the ring, so the
1968 # predictor thinks we'll have 5 shares and that we'll delete them
1969 # all. This part of the test depends upon the SIs landing right
1970 # where they do now.
1971 self.failUnlessIn("The remainder of this cycle is expected to "
1972 "recover: 4 shares, 4 buckets", s)
1973 self.failUnlessIn("The whole cycle is expected to examine "
1974 "5 shares in 5 buckets and to recover: "
1975 "5 shares, 5 buckets", s)
1976 d.addCallback(_check_html_in_cycle)
1978 # wait for the crawler to finish the first cycle. Two shares should
1981 return bool(lc.get_state()["last-cycle-finished"] is not None)
1982 d.addCallback(lambda ign: self.poll(_wait))
1984 def _after_first_cycle(ignored):
1985 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1986 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1987 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1988 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1989 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1990 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1993 last = s["history"][0]
1995 self.failUnlessEqual(last["expiration-enabled"], True)
1996 self.failUnlessEqual(last["configured-expiration-mode"],
1997 ("cutoff-date", None, then,
1998 ("mutable", "immutable")))
1999 self.failUnlessEqual(last["leases-per-share-histogram"],
2002 rec = last["space-recovered"]
2003 self.failUnlessEqual(rec["examined-buckets"], 4)
2004 self.failUnlessEqual(rec["examined-shares"], 4)
2005 self.failUnlessEqual(rec["actual-buckets"], 2)
2006 self.failUnlessEqual(rec["original-buckets"], 0)
2007 self.failUnlessEqual(rec["configured-buckets"], 2)
2008 self.failUnlessEqual(rec["actual-shares"], 2)
2009 self.failUnlessEqual(rec["original-shares"], 0)
2010 self.failUnlessEqual(rec["configured-shares"], 2)
2011 size = sf0_size + sf2_size
2012 self.failUnlessEqual(rec["actual-sharebytes"], size)
2013 self.failUnlessEqual(rec["original-sharebytes"], 0)
2014 self.failUnlessEqual(rec["configured-sharebytes"], size)
2015 # different platforms have different notions of "blocks used by
2016 # this file", so merely assert that it's a number
2017 self.failUnless(rec["actual-diskbytes"] >= 0,
2018 rec["actual-diskbytes"])
2019 self.failUnless(rec["original-diskbytes"] >= 0,
2020 rec["original-diskbytes"])
2021 self.failUnless(rec["configured-diskbytes"] >= 0,
2022 rec["configured-diskbytes"])
2023 d.addCallback(_after_first_cycle)
2024 d.addCallback(lambda ign: self.render1(webstatus))
2025 def _check_html(html):
2026 s = remove_tags(html)
2027 self.failUnlessIn("Expiration Enabled:"
2028 " expired leases will be removed", s)
2029 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2030 substr = "Leases created or last renewed before %s will be considered expired." % date
2031 self.failUnlessIn(substr, s)
2032 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2033 d.addCallback(_check_html)
2036 def test_only_immutable(self):
2037 basedir = "storage/LeaseCrawler/only_immutable"
2038 fileutil.make_dirs(basedir)
2040 then = int(now - 2000)
2041 ss = StorageServer(basedir, "\x00" * 20,
2042 expiration_enabled=True,
2043 expiration_mode="cutoff-date",
2044 expiration_cutoff_date=then,
2045 expiration_sharetypes=("immutable",))
2046 lc = ss.lease_checker
2048 webstatus = StorageStatus(ss)
2050 self.make_shares(ss)
2051 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2052 # set all leases to be expirable
2053 new_expiration_time = now - 3000 + 31*24*60*60
2055 def count_shares(si):
2056 return len(list(ss._iter_share_files(si)))
2057 def _get_sharefile(si):
2058 return list(ss._iter_share_files(si))[0]
2059 def count_leases(si):
2060 return len(list(_get_sharefile(si).get_leases()))
2062 sf0 = _get_sharefile(immutable_si_0)
2063 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2064 sf1 = _get_sharefile(immutable_si_1)
2065 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2066 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2067 sf2 = _get_sharefile(mutable_si_2)
2068 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2069 sf3 = _get_sharefile(mutable_si_3)
2070 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2071 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2073 ss.setServiceParent(self.s)
2075 return bool(lc.get_state()["last-cycle-finished"] is not None)
2076 d = self.poll(_wait)
2078 def _after_first_cycle(ignored):
2079 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2080 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2081 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2082 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2083 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2084 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2085 d.addCallback(_after_first_cycle)
2086 d.addCallback(lambda ign: self.render1(webstatus))
2087 def _check_html(html):
2088 s = remove_tags(html)
2089 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2090 d.addCallback(_check_html)
2093 def test_only_mutable(self):
2094 basedir = "storage/LeaseCrawler/only_mutable"
2095 fileutil.make_dirs(basedir)
2097 then = int(now - 2000)
2098 ss = StorageServer(basedir, "\x00" * 20,
2099 expiration_enabled=True,
2100 expiration_mode="cutoff-date",
2101 expiration_cutoff_date=then,
2102 expiration_sharetypes=("mutable",))
2103 lc = ss.lease_checker
2105 webstatus = StorageStatus(ss)
2107 self.make_shares(ss)
2108 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2109 # set all leases to be expirable
2110 new_expiration_time = now - 3000 + 31*24*60*60
2112 def count_shares(si):
2113 return len(list(ss._iter_share_files(si)))
2114 def _get_sharefile(si):
2115 return list(ss._iter_share_files(si))[0]
2116 def count_leases(si):
2117 return len(list(_get_sharefile(si).get_leases()))
2119 sf0 = _get_sharefile(immutable_si_0)
2120 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2121 sf1 = _get_sharefile(immutable_si_1)
2122 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2123 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2124 sf2 = _get_sharefile(mutable_si_2)
2125 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2126 sf3 = _get_sharefile(mutable_si_3)
2127 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2128 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2130 ss.setServiceParent(self.s)
2132 return bool(lc.get_state()["last-cycle-finished"] is not None)
2133 d = self.poll(_wait)
2135 def _after_first_cycle(ignored):
2136 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2137 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2138 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2139 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2140 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2141 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2142 d.addCallback(_after_first_cycle)
2143 d.addCallback(lambda ign: self.render1(webstatus))
2144 def _check_html(html):
2145 s = remove_tags(html)
2146 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2147 d.addCallback(_check_html)
2150 def test_bad_mode(self):
2151 basedir = "storage/LeaseCrawler/bad_mode"
2152 fileutil.make_dirs(basedir)
2153 e = self.failUnlessRaises(ValueError,
2154 StorageServer, basedir, "\x00" * 20,
2155 expiration_mode="bogus")
2156 self.failUnless("GC mode 'bogus' must be 'age' or 'cutoff-date'" in str(e), str(e))
2158 def test_parse_duration(self):
2162 p = time_format.parse_duration
2163 self.failUnlessEqual(p("7days"), 7*DAY)
2164 self.failUnlessEqual(p("31day"), 31*DAY)
2165 self.failUnlessEqual(p("60 days"), 60*DAY)
2166 self.failUnlessEqual(p("2mo"), 2*MONTH)
2167 self.failUnlessEqual(p("3 month"), 3*MONTH)
2168 self.failUnlessEqual(p("2years"), 2*YEAR)
2169 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2170 self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2173 def test_parse_date(self):
2174 p = time_format.parse_date
2175 self.failUnless(isinstance(p("2009-03-18"), int))
2176 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2178 def test_limited_history(self):
2179 basedir = "storage/LeaseCrawler/limited_history"
2180 fileutil.make_dirs(basedir)
2181 ss = StorageServer(basedir, "\x00" * 20)
2182 # make it start sooner than usual.
2183 lc = ss.lease_checker
2187 # create a few shares, with some leases on them
2188 self.make_shares(ss)
2190 ss.setServiceParent(self.s)
2192 def _wait_until_15_cycles_done():
2193 last = lc.state["last-cycle-finished"]
2194 if last is not None and last >= 15:
2199 d = self.poll(_wait_until_15_cycles_done)
2201 def _check(ignored):
2204 self.failUnlessEqual(len(h), 10)
2205 self.failUnlessEqual(max(h.keys()), 15)
2206 self.failUnlessEqual(min(h.keys()), 6)
2207 d.addCallback(_check)
2210 def test_unpredictable_future(self):
2211 basedir = "storage/LeaseCrawler/unpredictable_future"
2212 fileutil.make_dirs(basedir)
2213 ss = StorageServer(basedir, "\x00" * 20)
2214 # make it start sooner than usual.
2215 lc = ss.lease_checker
2217 lc.cpu_slice = -1.0 # stop quickly
2219 self.make_shares(ss)
2221 ss.setServiceParent(self.s)
2223 d = fireEventually()
2224 def _check(ignored):
2225 # this should fire after the first bucket is complete, but before
2226 # the first prefix is complete, so the progress-measurer won't
2227 # think we've gotten far enough to raise our percent-complete
2228 # above 0%, triggering the cannot-predict-the-future code in
2229 # expirer.py . This will have to change if/when the
2230 # progress-measurer gets smart enough to count buckets (we'll
2231 # have to interrupt it even earlier, before it's finished the
2234 self.failUnless("cycle-to-date" in s)
2235 self.failUnless("estimated-remaining-cycle" in s)
2236 self.failUnless("estimated-current-cycle" in s)
2238 left = s["estimated-remaining-cycle"]["space-recovered"]
2239 self.failUnlessEqual(left["actual-buckets"], None)
2240 self.failUnlessEqual(left["original-buckets"], None)
2241 self.failUnlessEqual(left["configured-buckets"], None)
2242 self.failUnlessEqual(left["actual-shares"], None)
2243 self.failUnlessEqual(left["original-shares"], None)
2244 self.failUnlessEqual(left["configured-shares"], None)
2245 self.failUnlessEqual(left["actual-diskbytes"], None)
2246 self.failUnlessEqual(left["original-diskbytes"], None)
2247 self.failUnlessEqual(left["configured-diskbytes"], None)
2248 self.failUnlessEqual(left["actual-sharebytes"], None)
2249 self.failUnlessEqual(left["original-sharebytes"], None)
2250 self.failUnlessEqual(left["configured-sharebytes"], None)
2252 full = s["estimated-remaining-cycle"]["space-recovered"]
2253 self.failUnlessEqual(full["actual-buckets"], None)
2254 self.failUnlessEqual(full["original-buckets"], None)
2255 self.failUnlessEqual(full["configured-buckets"], None)
2256 self.failUnlessEqual(full["actual-shares"], None)
2257 self.failUnlessEqual(full["original-shares"], None)
2258 self.failUnlessEqual(full["configured-shares"], None)
2259 self.failUnlessEqual(full["actual-diskbytes"], None)
2260 self.failUnlessEqual(full["original-diskbytes"], None)
2261 self.failUnlessEqual(full["configured-diskbytes"], None)
2262 self.failUnlessEqual(full["actual-sharebytes"], None)
2263 self.failUnlessEqual(full["original-sharebytes"], None)
2264 self.failUnlessEqual(full["configured-sharebytes"], None)
2266 d.addCallback(_check)
2269 def test_no_st_blocks(self):
2270 basedir = "storage/LeaseCrawler/no_st_blocks"
2271 fileutil.make_dirs(basedir)
2272 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2273 expiration_mode="age",
2274 expiration_override_lease_duration=-1000)
2275 # a negative expiration_time= means the "configured-"
2276 # space-recovered counts will be non-zero, since all shares will have
2279 # make it start sooner than usual.
2280 lc = ss.lease_checker
2283 self.make_shares(ss)
2284 ss.setServiceParent(self.s)
2286 return bool(lc.get_state()["last-cycle-finished"] is not None)
2287 d = self.poll(_wait)
2289 def _check(ignored):
2291 last = s["history"][0]
2292 rec = last["space-recovered"]
2293 self.failUnlessEqual(rec["configured-buckets"], 4)
2294 self.failUnlessEqual(rec["configured-shares"], 4)
2295 self.failUnless(rec["configured-sharebytes"] > 0,
2296 rec["configured-sharebytes"])
2297 # without the .st_blocks field in os.stat() results, we should be
2298 # reporting diskbytes==sharebytes
2299 self.failUnlessEqual(rec["configured-sharebytes"],
2300 rec["configured-diskbytes"])
2301 d.addCallback(_check)
2304 def test_share_corruption(self):
2305 self._poll_should_ignore_these_errors = [
2306 UnknownMutableContainerVersionError,
2307 UnknownImmutableContainerVersionError,
2309 basedir = "storage/LeaseCrawler/share_corruption"
2310 fileutil.make_dirs(basedir)
2311 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2312 w = StorageStatus(ss)
2313 # make it start sooner than usual.
2314 lc = ss.lease_checker
2315 lc.stop_after_first_bucket = True
2319 # create a few shares, with some leases on them
2320 self.make_shares(ss)
2322 # now corrupt one, and make sure the lease-checker keeps going
2323 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2324 first = min(self.sis)
2325 first_b32 = base32.b2a(first)
2326 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2329 f.write("BAD MAGIC")
2331 # if get_share_file() doesn't see the correct mutable magic, it
2332 # assumes the file is an immutable share, and then
2333 # immutable.ShareFile sees a bad version. So regardless of which kind
2334 # of share we corrupted, this will trigger an
2335 # UnknownImmutableContainerVersionError.
2337 # also create an empty bucket
2338 empty_si = base32.b2a("\x04"*16)
2339 empty_bucket_dir = os.path.join(ss.sharedir,
2340 storage_index_to_dir(empty_si))
2341 fileutil.make_dirs(empty_bucket_dir)
2343 ss.setServiceParent(self.s)
2345 d = fireEventually()
2347 # now examine the state right after the first bucket has been
2349 def _after_first_bucket(ignored):
2350 so_far = lc.get_state()["cycle-to-date"]
2351 rec = so_far["space-recovered"]
2352 self.failUnlessEqual(rec["examined-buckets"], 1)
2353 self.failUnlessEqual(rec["examined-shares"], 0)
2354 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2355 d.addCallback(_after_first_bucket)
2357 d.addCallback(lambda ign: self.render_json(w))
2358 def _check_json(json):
2359 data = simplejson.loads(json)
2360 # grr. json turns all dict keys into strings.
2361 so_far = data["lease-checker"]["cycle-to-date"]
2362 corrupt_shares = so_far["corrupt-shares"]
2363 # it also turns all tuples into lists
2364 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2365 d.addCallback(_check_json)
2366 d.addCallback(lambda ign: self.render1(w))
2367 def _check_html(html):
2368 s = remove_tags(html)
2369 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2370 d.addCallback(_check_html)
2373 return bool(lc.get_state()["last-cycle-finished"] is not None)
2374 d.addCallback(lambda ign: self.poll(_wait))
2376 def _after_first_cycle(ignored):
2378 last = s["history"][0]
2379 rec = last["space-recovered"]
2380 self.failUnlessEqual(rec["examined-buckets"], 5)
2381 self.failUnlessEqual(rec["examined-shares"], 3)
2382 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2383 d.addCallback(_after_first_cycle)
2384 d.addCallback(lambda ign: self.render_json(w))
2385 def _check_json_history(json):
2386 data = simplejson.loads(json)
2387 last = data["lease-checker"]["history"]["0"]
2388 corrupt_shares = last["corrupt-shares"]
2389 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2390 d.addCallback(_check_json_history)
2391 d.addCallback(lambda ign: self.render1(w))
2392 def _check_html_history(html):
2393 s = remove_tags(html)
2394 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2395 d.addCallback(_check_html_history)
2398 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2399 UnknownImmutableContainerVersionError)
2404 def render_json(self, page):
2405 d = self.render1(page, args={"t": ["json"]})
2408 class NoStatvfsServer(StorageServer):
2409 def do_statvfs(self):
2410 raise AttributeError
2412 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2415 self.s = service.MultiService()
2416 self.s.startService()
2418 return self.s.stopService()
2420 def test_no_server(self):
2421 w = StorageStatus(None)
2422 html = w.renderSynchronously()
2423 self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2425 def test_status(self):
2426 basedir = "storage/WebStatus/status"
2427 fileutil.make_dirs(basedir)
2428 ss = StorageServer(basedir, "\x00" * 20)
2429 ss.setServiceParent(self.s)
2430 w = StorageStatus(ss)
2432 def _check_html(html):
2433 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2434 s = remove_tags(html)
2435 self.failUnless("Accepting new shares: Yes" in s, s)
2436 self.failUnless("Reserved space: - 0 B (0)" in s, s)
2437 d.addCallback(_check_html)
2438 d.addCallback(lambda ign: self.render_json(w))
2439 def _check_json(json):
2440 data = simplejson.loads(json)
2442 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2443 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2444 self.failUnless("bucket-counter" in data)
2445 self.failUnless("lease-checker" in data)
2446 d.addCallback(_check_json)
2449 def render_json(self, page):
2450 d = self.render1(page, args={"t": ["json"]})
2453 def test_status_no_statvfs(self):
2454 # windows has no os.statvfs . Make sure the code handles that even on
2456 basedir = "storage/WebStatus/status_no_statvfs"
2457 fileutil.make_dirs(basedir)
2458 ss = NoStatvfsServer(basedir, "\x00" * 20)
2459 ss.setServiceParent(self.s)
2460 w = StorageStatus(ss)
2461 html = w.renderSynchronously()
2462 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2463 s = remove_tags(html)
2464 self.failUnless("Accepting new shares: Yes" in s, s)
2465 self.failUnless("Total disk space: ?" in s, s)
2467 def test_readonly(self):
2468 basedir = "storage/WebStatus/readonly"
2469 fileutil.make_dirs(basedir)
2470 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2471 ss.setServiceParent(self.s)
2472 w = StorageStatus(ss)
2473 html = w.renderSynchronously()
2474 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2475 s = remove_tags(html)
2476 self.failUnless("Accepting new shares: No" in s, s)
2478 def test_reserved(self):
2479 basedir = "storage/WebStatus/reserved"
2480 fileutil.make_dirs(basedir)
2481 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2482 ss.setServiceParent(self.s)
2483 w = StorageStatus(ss)
2484 html = w.renderSynchronously()
2485 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2486 s = remove_tags(html)
2487 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2489 def test_huge_reserved(self):
2490 basedir = "storage/WebStatus/reserved"
2491 fileutil.make_dirs(basedir)
2492 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2493 ss.setServiceParent(self.s)
2494 w = StorageStatus(ss)
2495 html = w.renderSynchronously()
2496 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2497 s = remove_tags(html)
2498 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2500 def test_util(self):
2501 w = StorageStatus(None)
2502 self.failUnlessEqual(w.render_space(None, None), "?")
2503 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2504 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2505 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2506 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2507 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)