2 import time, os.path, stat, re, simplejson, struct
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
30 def __init__(self, ignore_disconnectors=False):
31 self.ignore = ignore_disconnectors
32 self.disconnectors = {}
33 def notifyOnDisconnect(self, f, *args, **kwargs):
37 self.disconnectors[m] = (f, args, kwargs)
39 def dontNotifyOnDisconnect(self, marker):
42 del self.disconnectors[marker]
44 class FakeStatsProvider:
45 def count(self, name, delta=1):
47 def register_producer(self, producer):
50 class Bucket(unittest.TestCase):
51 def make_workdir(self, name):
52 basedir = os.path.join("storage", "Bucket", name)
53 incoming = os.path.join(basedir, "tmp", "bucket")
54 final = os.path.join(basedir, "bucket")
55 fileutil.make_dirs(basedir)
56 fileutil.make_dirs(os.path.join(basedir, "tmp"))
57 return incoming, final
59 def bucket_writer_closed(self, bw, consumed):
61 def add_latency(self, category, latency):
63 def count(self, name, delta=1):
68 renew_secret = os.urandom(32)
69 cancel_secret = os.urandom(32)
70 expiration_time = time.time() + 5000
71 return LeaseInfo(owner_num, renew_secret, cancel_secret,
72 expiration_time, "\x00" * 20)
74 def test_create(self):
75 incoming, final = self.make_workdir("test_create")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*25)
81 bw.remote_write(75, "d"*7)
84 def test_readwrite(self):
85 incoming, final = self.make_workdir("test_readwrite")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*7) # last block may be short
94 br = BucketReader(self, bw.finalhome)
95 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101 def callRemote(self, methname, *args, **kwargs):
103 meth = getattr(self.target, "remote_" + methname)
104 return meth(*args, **kwargs)
105 return defer.maybeDeferred(_call)
107 class BucketProxy(unittest.TestCase):
108 def make_bucket(self, name, size):
109 basedir = os.path.join("storage", "BucketProxy", name)
110 incoming = os.path.join(basedir, "tmp", "bucket")
111 final = os.path.join(basedir, "bucket")
112 fileutil.make_dirs(basedir)
113 fileutil.make_dirs(os.path.join(basedir, "tmp"))
114 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
120 def make_lease(self):
122 renew_secret = os.urandom(32)
123 cancel_secret = os.urandom(32)
124 expiration_time = time.time() + 5000
125 return LeaseInfo(owner_num, renew_secret, cancel_secret,
126 expiration_time, "\x00" * 20)
128 def bucket_writer_closed(self, bw, consumed):
130 def add_latency(self, category, latency):
132 def count(self, name, delta=1):
135 def test_create(self):
136 bw, rb, sharefname = self.make_bucket("test_create", 500)
137 bp = WriteBucketProxy(rb,
142 uri_extension_size_max=500, nodeid=None)
143 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
145 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146 # Let's pretend each share has 100 bytes of data, and that there are
147 # 4 segments (25 bytes each), and 8 shares total. So the two
148 # per-segment merkle trees (crypttext_hash_tree,
149 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152 # long. That should make the whole share:
154 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165 uri_extension = "s" + "E"*498 + "e"
167 bw, rb, sharefname = self.make_bucket(name, sharesize)
173 uri_extension_size_max=len(uri_extension),
177 d.addCallback(lambda res: bp.put_block(0, "a"*25))
178 d.addCallback(lambda res: bp.put_block(1, "b"*25))
179 d.addCallback(lambda res: bp.put_block(2, "c"*25))
180 d.addCallback(lambda res: bp.put_block(3, "d"*20))
181 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185 d.addCallback(lambda res: bp.close())
187 # now read everything back
188 def _start_reading(res):
189 br = BucketReader(self, sharefname)
192 rbp = rbp_class(rb, peerid="abc", storage_index="")
193 self.failUnlessIn("to peer", repr(rbp))
194 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
196 d1 = rbp.get_block_data(0, 25, 25)
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206 d1.addCallback(lambda res:
207 self.failUnlessEqual(res, crypttext_hashes))
208 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210 d1.addCallback(lambda res: rbp.get_share_hashes())
211 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212 d1.addCallback(lambda res: rbp.get_uri_extension())
213 d1.addCallback(lambda res:
214 self.failUnlessEqual(res, uri_extension))
218 d.addCallback(_start_reading)
222 def test_readwrite_v1(self):
223 return self._do_test_readwrite("test_readwrite_v1",
224 0x24, WriteBucketProxy, ReadBucketProxy)
226 def test_readwrite_v2(self):
227 return self._do_test_readwrite("test_readwrite_v2",
228 0x44, WriteBucketProxy_v2, ReadBucketProxy)
230 class FakeDiskStorageServer(StorageServer):
232 def get_disk_stats(self):
233 return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
235 class Server(unittest.TestCase):
238 self.sparent = LoggingServiceParent()
239 self.sparent.startService()
240 self._lease_secret = itertools.count()
242 return self.sparent.stopService()
244 def workdir(self, name):
245 basedir = os.path.join("storage", "Server", name)
248 def create(self, name, reserved_space=0, klass=StorageServer):
249 workdir = self.workdir(name)
250 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251 stats_provider=FakeStatsProvider())
252 ss.setServiceParent(self.sparent)
255 def test_create(self):
256 self.create("test_create")
258 def allocate(self, ss, storage_index, sharenums, size, canary=None):
259 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
262 canary = FakeCanary()
263 return ss.remote_allocate_buckets(storage_index,
264 renew_secret, cancel_secret,
265 sharenums, size, canary)
267 def test_large_share(self):
268 ss = self.create("test_large_share")
270 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271 self.failUnlessEqual(already, set())
272 self.failUnlessEqual(set(writers.keys()), set([0]))
274 shnum, bucket = writers.items()[0]
275 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
276 bucket.remote_write(2**32, "ab")
277 bucket.remote_close()
279 readers = ss.remote_get_buckets("allocate")
280 reader = readers[shnum]
281 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
284 def test_dont_overfill_dirs(self):
286 This test asserts that if you add a second share whose storage index
287 share lots of leading bits with an extant share (but isn't the exact
288 same storage index), this won't add an entry to the share directory.
290 ss = self.create("test_dont_overfill_dirs")
291 already, writers = self.allocate(ss, "storageindex", [0], 10)
292 for i, wb in writers.items():
293 wb.remote_write(0, "%10d" % i)
295 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
297 children_of_storedir = set(os.listdir(storedir))
299 # Now store another one under another storageindex that has leading
300 # chars the same as the first storageindex.
301 already, writers = self.allocate(ss, "storageindey", [0], 10)
302 for i, wb in writers.items():
303 wb.remote_write(0, "%10d" % i)
305 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
307 new_children_of_storedir = set(os.listdir(storedir))
308 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
310 def test_remove_incoming(self):
311 ss = self.create("test_remove_incoming")
312 already, writers = self.allocate(ss, "vid", range(3), 10)
313 for i,wb in writers.items():
314 wb.remote_write(0, "%10d" % i)
316 incoming_share_dir = wb.incominghome
317 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319 incoming_dir = os.path.dirname(incoming_prefix_dir)
320 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
321 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
322 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
324 def test_allocate(self):
325 ss = self.create("test_allocate")
327 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
329 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330 self.failUnlessEqual(already, set())
331 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
333 # while the buckets are open, they should not count as readable
334 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
337 for i,wb in writers.items():
338 wb.remote_write(0, "%25d" % i)
340 # aborting a bucket that was already closed is a no-op
343 # now they should be readable
344 b = ss.remote_get_buckets("allocate")
345 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
348 self.failUnlessIn("BucketReader", b_str)
349 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
351 # now if we ask about writing again, the server should offer those
352 # three buckets as already present. It should offer them even if we
353 # don't ask about those specific ones.
354 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355 self.failUnlessEqual(already, set([0,1,2]))
356 self.failUnlessEqual(set(writers.keys()), set([3,4]))
358 # while those two buckets are open for writing, the server should
359 # refuse to offer them to uploaders
361 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362 self.failUnlessEqual(already2, set([0,1,2]))
363 self.failUnlessEqual(set(writers2.keys()), set([5]))
365 # aborting the writes should remove the tempfiles
366 for i,wb in writers2.items():
368 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369 self.failUnlessEqual(already2, set([0,1,2]))
370 self.failUnlessEqual(set(writers2.keys()), set([5]))
372 for i,wb in writers2.items():
374 for i,wb in writers.items():
377 def test_bad_container_version(self):
378 ss = self.create("test_bad_container_version")
379 a,w = self.allocate(ss, "si1", [0], 10)
380 w[0].remote_write(0, "\xff"*10)
383 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
386 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
389 ss.remote_get_buckets("allocate")
391 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392 ss.remote_get_buckets, "si1")
393 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
395 def test_disconnect(self):
396 # simulate a disconnection
397 ss = self.create("test_disconnect")
398 canary = FakeCanary()
399 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400 self.failUnlessEqual(already, set())
401 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402 for (f,args,kwargs) in canary.disconnectors.values():
407 # that ought to delete the incoming shares
408 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409 self.failUnlessEqual(already, set())
410 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
412 def test_reserved_space(self):
413 ss = self.create("test_reserved_space", reserved_space=10000,
414 klass=FakeDiskStorageServer)
415 # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
417 # 15k available, 10k reserved, leaves 5k for shares
419 # a newly created and filled share incurs this much overhead, beyond
420 # the size we request.
422 LEASE_SIZE = 4+32+32+4
423 canary = FakeCanary(True)
424 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425 self.failUnlessEqual(len(writers), 3)
426 # now the StorageServer should have 3000 bytes provisionally
427 # allocated, allowing only 2000 more to be claimed
428 self.failUnlessEqual(len(ss._active_writers), 3)
430 # allocating 1001-byte shares only leaves room for one
431 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432 self.failUnlessEqual(len(writers2), 1)
433 self.failUnlessEqual(len(ss._active_writers), 4)
435 # we abandon the first set, so their provisional allocation should be
439 self.failUnlessEqual(len(ss._active_writers), 1)
440 # now we have a provisional allocation of 1001 bytes
442 # and we close the second set, so their provisional allocation should
443 # become real, long-term allocation, and grows to include the
445 for bw in writers2.values():
446 bw.remote_write(0, "a"*25)
451 self.failUnlessEqual(len(ss._active_writers), 0)
453 allocated = 1001 + OVERHEAD + LEASE_SIZE
455 # we have to manually increase DISKAVAIL, since we're not doing real
457 ss.DISKAVAIL -= allocated
459 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462 self.failUnlessEqual(len(writers3), 39)
463 self.failUnlessEqual(len(ss._active_writers), 39)
467 self.failUnlessEqual(len(ss._active_writers), 0)
468 ss.disownServiceParent()
471 def test_disk_stats(self):
472 # This will spuriously fail if there is zero disk space left (but so will other tests).
473 ss = self.create("test_disk_stats", reserved_space=0)
475 disk = ss.get_disk_stats()
476 self.failUnless(disk['total'] > 0, disk['total'])
477 self.failUnless(disk['used'] > 0, disk['used'])
478 self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
479 self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
480 self.failUnless(disk['avail'] > 0, disk['avail'])
482 def test_disk_stats_avail_nonnegative(self):
483 ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
485 disk = ss.get_disk_stats()
486 self.failUnlessEqual(disk['avail'], 0)
489 basedir = self.workdir("test_seek_behavior")
490 fileutil.make_dirs(basedir)
491 filename = os.path.join(basedir, "testfile")
492 f = open(filename, "wb")
495 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
496 # files. mode="a" preserves previous contents but does not allow
497 # seeking-to-create-holes. mode="r+" allows both.
498 f = open(filename, "rb+")
502 filelen = os.stat(filename)[stat.ST_SIZE]
503 self.failUnlessEqual(filelen, 100+3)
504 f2 = open(filename, "rb")
505 self.failUnlessEqual(f2.read(5), "start")
508 def test_leases(self):
509 ss = self.create("test_leases")
510 canary = FakeCanary()
514 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
515 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
516 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
517 sharenums, size, canary)
518 self.failUnlessEqual(len(already), 0)
519 self.failUnlessEqual(len(writers), 5)
520 for wb in writers.values():
523 leases = list(ss.get_leases("si0"))
524 self.failUnlessEqual(len(leases), 1)
525 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
527 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
528 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
529 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
530 sharenums, size, canary)
531 for wb in writers.values():
534 # take out a second lease on si1
535 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
538 sharenums, size, canary)
539 self.failUnlessEqual(len(already), 5)
540 self.failUnlessEqual(len(writers), 0)
542 leases = list(ss.get_leases("si1"))
543 self.failUnlessEqual(len(leases), 2)
544 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
546 # and a third lease, using add-lease
547 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
548 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
549 ss.remote_add_lease("si1", rs2a, cs2a)
550 leases = list(ss.get_leases("si1"))
551 self.failUnlessEqual(len(leases), 3)
552 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
554 # add-lease on a missing storage index is silently ignored
555 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
557 # check that si0 is readable
558 readers = ss.remote_get_buckets("si0")
559 self.failUnlessEqual(len(readers), 5)
561 # renew the first lease. Only the proper renew_secret should work
562 ss.remote_renew_lease("si0", rs0)
563 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
564 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
566 # check that si0 is still readable
567 readers = ss.remote_get_buckets("si0")
568 self.failUnlessEqual(len(readers), 5)
571 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
572 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
573 ss.remote_cancel_lease("si0", cs0)
575 # si0 should now be gone
576 readers = ss.remote_get_buckets("si0")
577 self.failUnlessEqual(len(readers), 0)
578 # and the renew should no longer work
579 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
582 # cancel the first lease on si1, leaving the second and third in place
583 ss.remote_cancel_lease("si1", cs1)
584 readers = ss.remote_get_buckets("si1")
585 self.failUnlessEqual(len(readers), 5)
586 # the corresponding renew should no longer work
587 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
589 leases = list(ss.get_leases("si1"))
590 self.failUnlessEqual(len(leases), 2)
591 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
593 ss.remote_renew_lease("si1", rs2)
594 # cancelling the second and third should make it go away
595 ss.remote_cancel_lease("si1", cs2)
596 ss.remote_cancel_lease("si1", cs2a)
597 readers = ss.remote_get_buckets("si1")
598 self.failUnlessEqual(len(readers), 0)
599 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
600 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
601 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
603 leases = list(ss.get_leases("si1"))
604 self.failUnlessEqual(len(leases), 0)
607 # test overlapping uploads
608 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
611 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
612 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
613 sharenums, size, canary)
614 self.failUnlessEqual(len(already), 0)
615 self.failUnlessEqual(len(writers), 5)
616 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
617 sharenums, size, canary)
618 self.failUnlessEqual(len(already2), 0)
619 self.failUnlessEqual(len(writers2), 0)
620 for wb in writers.values():
623 leases = list(ss.get_leases("si3"))
624 self.failUnlessEqual(len(leases), 1)
626 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
627 sharenums, size, canary)
628 self.failUnlessEqual(len(already3), 5)
629 self.failUnlessEqual(len(writers3), 0)
631 leases = list(ss.get_leases("si3"))
632 self.failUnlessEqual(len(leases), 2)
634 def test_readonly(self):
635 workdir = self.workdir("test_readonly")
636 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
637 ss.setServiceParent(self.sparent)
639 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
640 self.failUnlessEqual(already, set())
641 self.failUnlessEqual(writers, {})
643 stats = ss.get_stats()
644 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
645 if "storage_server.disk_avail" in stats:
646 # Some platforms may not have an API to get disk stats.
647 # But if there are stats, readonly_storage means disk_avail=0
648 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
650 def test_discard(self):
651 # discard is really only used for other tests, but we test it anyways
652 workdir = self.workdir("test_discard")
653 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
654 ss.setServiceParent(self.sparent)
656 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
657 self.failUnlessEqual(already, set())
658 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
659 for i,wb in writers.items():
660 wb.remote_write(0, "%25d" % i)
662 # since we discard the data, the shares should be present but sparse.
663 # Since we write with some seeks, the data we read back will be all
665 b = ss.remote_get_buckets("vid")
666 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
667 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
669 def test_advise_corruption(self):
670 workdir = self.workdir("test_advise_corruption")
671 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
672 ss.setServiceParent(self.sparent)
674 si0_s = base32.b2a("si0")
675 ss.remote_advise_corrupt_share("immutable", "si0", 0,
676 "This share smells funny.\n")
677 reportdir = os.path.join(workdir, "corruption-advisories")
678 reports = os.listdir(reportdir)
679 self.failUnlessEqual(len(reports), 1)
680 report_si0 = reports[0]
681 self.failUnlessIn(si0_s, report_si0)
682 f = open(os.path.join(reportdir, report_si0), "r")
685 self.failUnlessIn("type: immutable", report)
686 self.failUnlessIn("storage_index: %s" % si0_s, report)
687 self.failUnlessIn("share_number: 0", report)
688 self.failUnlessIn("This share smells funny.", report)
690 # test the RIBucketWriter version too
691 si1_s = base32.b2a("si1")
692 already,writers = self.allocate(ss, "si1", [1], 75)
693 self.failUnlessEqual(already, set())
694 self.failUnlessEqual(set(writers.keys()), set([1]))
695 writers[1].remote_write(0, "data")
696 writers[1].remote_close()
698 b = ss.remote_get_buckets("si1")
699 self.failUnlessEqual(set(b.keys()), set([1]))
700 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
702 reports = os.listdir(reportdir)
703 self.failUnlessEqual(len(reports), 2)
704 report_si1 = [r for r in reports if si1_s in r][0]
705 f = open(os.path.join(reportdir, report_si1), "r")
708 self.failUnlessIn("type: immutable", report)
709 self.failUnlessIn("storage_index: %s" % si1_s, report)
710 self.failUnlessIn("share_number: 1", report)
711 self.failUnlessIn("This share tastes like dust.", report)
715 class MutableServer(unittest.TestCase):
718 self.sparent = LoggingServiceParent()
719 self._lease_secret = itertools.count()
721 return self.sparent.stopService()
723 def workdir(self, name):
724 basedir = os.path.join("storage", "MutableServer", name)
727 def create(self, name):
728 workdir = self.workdir(name)
729 ss = StorageServer(workdir, "\x00" * 20)
730 ss.setServiceParent(self.sparent)
733 def test_create(self):
734 self.create("test_create")
736 def write_enabler(self, we_tag):
737 return hashutil.tagged_hash("we_blah", we_tag)
739 def renew_secret(self, tag):
740 return hashutil.tagged_hash("renew_blah", str(tag))
742 def cancel_secret(self, tag):
743 return hashutil.tagged_hash("cancel_blah", str(tag))
745 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
746 write_enabler = self.write_enabler(we_tag)
747 renew_secret = self.renew_secret(lease_tag)
748 cancel_secret = self.cancel_secret(lease_tag)
749 rstaraw = ss.remote_slot_testv_and_readv_and_writev
750 testandwritev = dict( [ (shnum, ([], [], None) )
751 for shnum in sharenums ] )
753 rc = rstaraw(storage_index,
754 (write_enabler, renew_secret, cancel_secret),
757 (did_write, readv_data) = rc
758 self.failUnless(did_write)
759 self.failUnless(isinstance(readv_data, dict))
760 self.failUnlessEqual(len(readv_data), 0)
762 def test_bad_magic(self):
763 ss = self.create("test_bad_magic")
764 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
765 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
770 read = ss.remote_slot_readv
771 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
772 read, "si1", [0], [(0,10)])
773 self.failUnlessIn(" had magic ", str(e))
774 self.failUnlessIn(" but we wanted ", str(e))
776 def test_container_size(self):
777 ss = self.create("test_container_size")
778 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
780 read = ss.remote_slot_readv
781 rstaraw = ss.remote_slot_testv_and_readv_and_writev
782 secrets = ( self.write_enabler("we1"),
783 self.renew_secret("we1"),
784 self.cancel_secret("we1") )
785 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
786 answer = rstaraw("si1", secrets,
787 {0: ([], [(0,data)], len(data)+12)},
789 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
791 # trying to make the container too large will raise an exception
792 TOOBIG = MutableShareFile.MAX_SIZE + 10
793 self.failUnlessRaises(DataTooLargeError,
794 rstaraw, "si1", secrets,
795 {0: ([], [(0,data)], TOOBIG)},
798 # it should be possible to make the container smaller, although at
799 # the moment this doesn't actually affect the share, unless the
800 # container size is dropped to zero, in which case the share is
802 answer = rstaraw("si1", secrets,
803 {0: ([], [(0,data)], len(data)+8)},
805 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
807 answer = rstaraw("si1", secrets,
808 {0: ([], [(0,data)], 0)},
810 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
812 read_answer = read("si1", [0], [(0,10)])
813 self.failUnlessEqual(read_answer, {})
815 def test_allocate(self):
816 ss = self.create("test_allocate")
817 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
820 read = ss.remote_slot_readv
821 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
823 self.failUnlessEqual(read("si1", [], [(0, 10)]),
824 {0: [""], 1: [""], 2: [""]})
825 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
829 secrets = ( self.write_enabler("we1"),
830 self.renew_secret("we1"),
831 self.cancel_secret("we1") )
832 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
833 write = ss.remote_slot_testv_and_readv_and_writev
834 answer = write("si1", secrets,
835 {0: ([], [(0,data)], None)},
837 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
839 self.failUnlessEqual(read("si1", [0], [(0,20)]),
840 {0: ["00000000001111111111"]})
841 self.failUnlessEqual(read("si1", [0], [(95,10)]),
843 #self.failUnlessEqual(s0.remote_get_length(), 100)
845 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
846 f = self.failUnlessRaises(BadWriteEnablerError,
847 write, "si1", bad_secrets,
849 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
851 # this testv should fail
852 answer = write("si1", secrets,
853 {0: ([(0, 12, "eq", "444444444444"),
854 (20, 5, "eq", "22222"),
861 self.failUnlessEqual(answer, (False,
862 {0: ["000000000011", "22222"],
866 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
869 answer = write("si1", secrets,
870 {0: ([(10, 5, "lt", "11111"),
877 self.failUnlessEqual(answer, (False,
882 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
885 def test_operators(self):
886 # test operators, the data we're comparing is '11111' in all cases.
887 # test both fail+pass, reset data after each one.
888 ss = self.create("test_operators")
890 secrets = ( self.write_enabler("we1"),
891 self.renew_secret("we1"),
892 self.cancel_secret("we1") )
893 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
894 write = ss.remote_slot_testv_and_readv_and_writev
895 read = ss.remote_slot_readv
898 write("si1", secrets,
899 {0: ([], [(0,data)], None)},
905 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
910 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
911 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
912 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
915 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
920 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
921 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
924 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
929 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
930 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
934 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
939 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
940 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
943 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
948 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
949 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
952 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
957 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
962 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
967 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
968 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
971 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
976 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
977 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
981 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
986 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
987 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
990 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
995 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
996 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1000 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1005 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1006 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1009 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1014 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1018 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1023 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1024 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1028 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1033 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1034 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1037 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1042 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1043 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1046 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1051 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1055 # finally, test some operators against empty shares
1056 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1061 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1062 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1065 def test_readv(self):
1066 ss = self.create("test_readv")
1067 secrets = ( self.write_enabler("we1"),
1068 self.renew_secret("we1"),
1069 self.cancel_secret("we1") )
1070 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1071 write = ss.remote_slot_testv_and_readv_and_writev
1072 read = ss.remote_slot_readv
1073 data = [("%d" % i) * 100 for i in range(3)]
1074 rc = write("si1", secrets,
1075 {0: ([], [(0,data[0])], None),
1076 1: ([], [(0,data[1])], None),
1077 2: ([], [(0,data[2])], None),
1079 self.failUnlessEqual(rc, (True, {}))
1081 answer = read("si1", [], [(0, 10)])
1082 self.failUnlessEqual(answer, {0: ["0"*10],
1086 def compare_leases_without_timestamps(self, leases_a, leases_b):
1087 self.failUnlessEqual(len(leases_a), len(leases_b))
1088 for i in range(len(leases_a)):
1091 self.failUnlessEqual(a.owner_num, b.owner_num)
1092 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1093 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1094 self.failUnlessEqual(a.nodeid, b.nodeid)
1096 def compare_leases(self, leases_a, leases_b):
1097 self.failUnlessEqual(len(leases_a), len(leases_b))
1098 for i in range(len(leases_a)):
1101 self.failUnlessEqual(a.owner_num, b.owner_num)
1102 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1103 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1104 self.failUnlessEqual(a.nodeid, b.nodeid)
1105 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1107 def test_leases(self):
1108 ss = self.create("test_leases")
1110 return ( self.write_enabler("we1"),
1111 self.renew_secret("we1-%d" % n),
1112 self.cancel_secret("we1-%d" % n) )
1113 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1114 write = ss.remote_slot_testv_and_readv_and_writev
1115 read = ss.remote_slot_readv
1116 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1117 self.failUnlessEqual(rc, (True, {}))
1119 # create a random non-numeric file in the bucket directory, to
1120 # exercise the code that's supposed to ignore those.
1121 bucket_dir = os.path.join(self.workdir("test_leases"),
1122 "shares", storage_index_to_dir("si1"))
1123 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1124 f.write("you ought to be ignoring me\n")
1127 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1128 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1130 # add-lease on a missing storage index is silently ignored
1131 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1133 # re-allocate the slots and use the same secrets, that should update
1135 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1136 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1139 ss.remote_renew_lease("si1", secrets(0)[1])
1140 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1142 # now allocate them with a bunch of different secrets, to trigger the
1143 # extended lease code. Use add_lease for one of them.
1144 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1145 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1146 secrets2 = secrets(2)
1147 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1148 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1149 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1150 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1151 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1153 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1155 # cancel one of them
1156 ss.remote_cancel_lease("si1", secrets(5)[2])
1157 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1159 all_leases = list(s0.get_leases())
1160 # and write enough data to expand the container, forcing the server
1161 # to move the leases
1162 write("si1", secrets(0),
1163 {0: ([], [(0,data)], 200), },
1166 # read back the leases, make sure they're still intact.
1167 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1169 ss.remote_renew_lease("si1", secrets(0)[1])
1170 ss.remote_renew_lease("si1", secrets(1)[1])
1171 ss.remote_renew_lease("si1", secrets(2)[1])
1172 ss.remote_renew_lease("si1", secrets(3)[1])
1173 ss.remote_renew_lease("si1", secrets(4)[1])
1174 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1175 # get a new copy of the leases, with the current timestamps. Reading
1176 # data and failing to renew/cancel leases should leave the timestamps
1178 all_leases = list(s0.get_leases())
1179 # renewing with a bogus token should prompt an error message
1181 # examine the exception thus raised, make sure the old nodeid is
1182 # present, to provide for share migration
1183 e = self.failUnlessRaises(IndexError,
1184 ss.remote_renew_lease, "si1",
1187 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1188 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1189 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1191 # same for cancelling
1192 self.failUnlessRaises(IndexError,
1193 ss.remote_cancel_lease, "si1",
1195 self.compare_leases(all_leases, list(s0.get_leases()))
1197 # reading shares should not modify the timestamp
1198 read("si1", [], [(0,200)])
1199 self.compare_leases(all_leases, list(s0.get_leases()))
1201 write("si1", secrets(0),
1202 {0: ([], [(200, "make me bigger")], None)}, [])
1203 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1205 write("si1", secrets(0),
1206 {0: ([], [(500, "make me really bigger")], None)}, [])
1207 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1209 # now cancel them all
1210 ss.remote_cancel_lease("si1", secrets(0)[2])
1211 ss.remote_cancel_lease("si1", secrets(1)[2])
1212 ss.remote_cancel_lease("si1", secrets(2)[2])
1213 ss.remote_cancel_lease("si1", secrets(3)[2])
1215 # the slot should still be there
1216 remaining_shares = read("si1", [], [(0,10)])
1217 self.failUnlessEqual(len(remaining_shares), 1)
1218 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1220 # cancelling a non-existent lease should raise an IndexError
1221 self.failUnlessRaises(IndexError,
1222 ss.remote_cancel_lease, "si1", "nonsecret")
1224 # and the slot should still be there
1225 remaining_shares = read("si1", [], [(0,10)])
1226 self.failUnlessEqual(len(remaining_shares), 1)
1227 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1229 ss.remote_cancel_lease("si1", secrets(4)[2])
1230 # now the slot should be gone
1231 no_shares = read("si1", [], [(0,10)])
1232 self.failUnlessEqual(no_shares, {})
1234 # cancelling a lease on a non-existent share should raise an IndexError
1235 self.failUnlessRaises(IndexError,
1236 ss.remote_cancel_lease, "si2", "nonsecret")
1238 def test_remove(self):
1239 ss = self.create("test_remove")
1240 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1242 readv = ss.remote_slot_readv
1243 writev = ss.remote_slot_testv_and_readv_and_writev
1244 secrets = ( self.write_enabler("we1"),
1245 self.renew_secret("we1"),
1246 self.cancel_secret("we1") )
1247 # delete sh0 by setting its size to zero
1248 answer = writev("si1", secrets,
1251 # the answer should mention all the shares that existed before the
1253 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1254 # but a new read should show only sh1 and sh2
1255 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1258 # delete sh1 by setting its size to zero
1259 answer = writev("si1", secrets,
1262 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1263 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1266 # delete sh2 by setting its size to zero
1267 answer = writev("si1", secrets,
1270 self.failUnlessEqual(answer, (True, {2:[]}) )
1271 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1273 # and the bucket directory should now be gone
1274 si = base32.b2a("si1")
1275 # note: this is a detail of the storage server implementation, and
1276 # may change in the future
1278 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1279 bucketdir = os.path.join(prefixdir, si)
1280 self.failUnless(os.path.exists(prefixdir), prefixdir)
1281 self.failIf(os.path.exists(bucketdir), bucketdir)
1283 class Stats(unittest.TestCase):
1286 self.sparent = LoggingServiceParent()
1287 self._lease_secret = itertools.count()
1289 return self.sparent.stopService()
1291 def workdir(self, name):
1292 basedir = os.path.join("storage", "Server", name)
1295 def create(self, name):
1296 workdir = self.workdir(name)
1297 ss = StorageServer(workdir, "\x00" * 20)
1298 ss.setServiceParent(self.sparent)
1301 def test_latencies(self):
1302 ss = self.create("test_latencies")
1303 for i in range(10000):
1304 ss.add_latency("allocate", 1.0 * i)
1305 for i in range(1000):
1306 ss.add_latency("renew", 1.0 * i)
1308 ss.add_latency("cancel", 2.0 * i)
1309 ss.add_latency("get", 5.0)
1311 output = ss.get_latencies()
1313 self.failUnlessEqual(sorted(output.keys()),
1314 sorted(["allocate", "renew", "cancel", "get"]))
1315 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1316 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1317 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1318 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1319 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1320 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1321 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1322 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1323 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1325 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1326 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1327 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
1328 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1329 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1330 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1331 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1332 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1333 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1335 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1336 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1337 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1, output)
1338 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
1339 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1340 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1341 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1342 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1343 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1345 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1346 self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1347 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1348 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1349 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1350 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1351 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1352 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1353 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1356 s = re.sub(r'<[^>]*>', ' ', s)
1357 s = re.sub(r'\s+', ' ', s)
1360 class MyBucketCountingCrawler(BucketCountingCrawler):
1361 def finished_prefix(self, cycle, prefix):
1362 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1364 d = self.hook_ds.pop(0)
1367 class MyStorageServer(StorageServer):
1368 def add_bucket_counter(self):
1369 statefile = os.path.join(self.storedir, "bucket_counter.state")
1370 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1371 self.bucket_counter.setServiceParent(self)
1373 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1376 self.s = service.MultiService()
1377 self.s.startService()
1379 return self.s.stopService()
1381 def test_bucket_counter(self):
1382 basedir = "storage/BucketCounter/bucket_counter"
1383 fileutil.make_dirs(basedir)
1384 ss = StorageServer(basedir, "\x00" * 20)
1385 # to make sure we capture the bucket-counting-crawler in the middle
1386 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1387 # also make it start sooner than usual.
1388 ss.bucket_counter.slow_start = 0
1389 orig_cpu_slice = ss.bucket_counter.cpu_slice
1390 ss.bucket_counter.cpu_slice = 0
1391 ss.setServiceParent(self.s)
1393 w = StorageStatus(ss)
1395 # this sample is before the crawler has started doing anything
1396 html = w.renderSynchronously()
1397 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1398 s = remove_tags(html)
1399 self.failUnlessIn("Accepting new shares: Yes", s)
1400 self.failUnlessIn("Reserved space: - 0 B (0)", s)
1401 self.failUnlessIn("Total buckets: Not computed yet", s)
1402 self.failUnlessIn("Next crawl in", s)
1404 # give the bucket-counting-crawler one tick to get started. The
1405 # cpu_slice=0 will force it to yield right after it processes the
1408 d = fireEventually()
1409 def _check(ignored):
1410 # are we really right after the first prefix?
1411 state = ss.bucket_counter.get_state()
1412 self.failUnlessEqual(state["last-complete-prefix"],
1413 ss.bucket_counter.prefixes[0])
1414 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1415 html = w.renderSynchronously()
1416 s = remove_tags(html)
1417 self.failUnlessIn(" Current crawl ", s)
1418 self.failUnlessIn(" (next work in ", s)
1419 d.addCallback(_check)
1421 # now give it enough time to complete a full cycle
1423 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1424 d.addCallback(lambda ignored: self.poll(_watch))
1425 def _check2(ignored):
1426 ss.bucket_counter.cpu_slice = orig_cpu_slice
1427 html = w.renderSynchronously()
1428 s = remove_tags(html)
1429 self.failUnlessIn("Total buckets: 0 (the number of", s)
1430 self.failUnlessIn("Next crawl in 59 minutes", s)
1431 d.addCallback(_check2)
1434 def test_bucket_counter_cleanup(self):
1435 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1436 fileutil.make_dirs(basedir)
1437 ss = StorageServer(basedir, "\x00" * 20)
1438 # to make sure we capture the bucket-counting-crawler in the middle
1439 # of a cycle, we reach in and reduce its maximum slice time to 0.
1440 ss.bucket_counter.slow_start = 0
1441 orig_cpu_slice = ss.bucket_counter.cpu_slice
1442 ss.bucket_counter.cpu_slice = 0
1443 ss.setServiceParent(self.s)
1445 d = fireEventually()
1447 def _after_first_prefix(ignored):
1448 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1449 # now sneak in and mess with its state, to make sure it cleans up
1450 # properly at the end of the cycle
1451 state = ss.bucket_counter.state
1452 self.failUnlessEqual(state["last-complete-prefix"],
1453 ss.bucket_counter.prefixes[0])
1454 state["bucket-counts"][-12] = {}
1455 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1456 ss.bucket_counter.save_state()
1457 d.addCallback(_after_first_prefix)
1459 # now give it enough time to complete a cycle
1461 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1462 d.addCallback(lambda ignored: self.poll(_watch))
1463 def _check2(ignored):
1464 ss.bucket_counter.cpu_slice = orig_cpu_slice
1465 s = ss.bucket_counter.get_state()
1466 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1467 self.failIf("bogusprefix!" in s["storage-index-samples"],
1468 s["storage-index-samples"].keys())
1469 d.addCallback(_check2)
1472 def test_bucket_counter_eta(self):
1473 basedir = "storage/BucketCounter/bucket_counter_eta"
1474 fileutil.make_dirs(basedir)
1475 ss = MyStorageServer(basedir, "\x00" * 20)
1476 ss.bucket_counter.slow_start = 0
1477 # these will be fired inside finished_prefix()
1478 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1479 w = StorageStatus(ss)
1481 d = defer.Deferred()
1483 def _check_1(ignored):
1484 # no ETA is available yet
1485 html = w.renderSynchronously()
1486 s = remove_tags(html)
1487 self.failUnlessIn("complete (next work", s)
1489 def _check_2(ignored):
1490 # one prefix has finished, so an ETA based upon that elapsed time
1491 # should be available.
1492 html = w.renderSynchronously()
1493 s = remove_tags(html)
1494 self.failUnlessIn("complete (ETA ", s)
1496 def _check_3(ignored):
1497 # two prefixes have finished
1498 html = w.renderSynchronously()
1499 s = remove_tags(html)
1500 self.failUnlessIn("complete (ETA ", s)
1503 hooks[0].addCallback(_check_1).addErrback(d.errback)
1504 hooks[1].addCallback(_check_2).addErrback(d.errback)
1505 hooks[2].addCallback(_check_3).addErrback(d.errback)
1507 ss.setServiceParent(self.s)
1510 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1511 stop_after_first_bucket = False
1512 def process_bucket(self, *args, **kwargs):
1513 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1514 if self.stop_after_first_bucket:
1515 self.stop_after_first_bucket = False
1516 self.cpu_slice = -1.0
1517 def yielding(self, sleep_time):
1518 if not self.stop_after_first_bucket:
1519 self.cpu_slice = 500
1521 class BrokenStatResults:
1523 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1526 bsr = BrokenStatResults()
1527 for attrname in dir(s):
1528 if attrname.startswith("_"):
1530 if attrname == "st_blocks":
1532 setattr(bsr, attrname, getattr(s, attrname))
1535 class InstrumentedStorageServer(StorageServer):
1536 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1537 class No_ST_BLOCKS_StorageServer(StorageServer):
1538 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1540 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1543 self.s = service.MultiService()
1544 self.s.startService()
1546 return self.s.stopService()
1548 def make_shares(self, ss):
1550 return (si, hashutil.tagged_hash("renew", si),
1551 hashutil.tagged_hash("cancel", si))
1552 def make_mutable(si):
1553 return (si, hashutil.tagged_hash("renew", si),
1554 hashutil.tagged_hash("cancel", si),
1555 hashutil.tagged_hash("write-enabler", si))
1556 def make_extra_lease(si, num):
1557 return (hashutil.tagged_hash("renew-%d" % num, si),
1558 hashutil.tagged_hash("cancel-%d" % num, si))
1560 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1561 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1562 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1563 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1564 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1565 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1567 canary = FakeCanary()
1568 # note: 'tahoe debug dump-share' will not handle this file, since the
1569 # inner contents are not a valid CHK share
1570 data = "\xff" * 1000
1572 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1574 w[0].remote_write(0, data)
1577 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1579 w[0].remote_write(0, data)
1581 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1583 writev = ss.remote_slot_testv_and_readv_and_writev
1584 writev(mutable_si_2, (we2, rs2, cs2),
1585 {0: ([], [(0,data)], len(data))}, [])
1586 writev(mutable_si_3, (we3, rs3, cs3),
1587 {0: ([], [(0,data)], len(data))}, [])
1588 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1590 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1591 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1592 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1594 def test_basic(self):
1595 basedir = "storage/LeaseCrawler/basic"
1596 fileutil.make_dirs(basedir)
1597 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1598 # make it start sooner than usual.
1599 lc = ss.lease_checker
1602 lc.stop_after_first_bucket = True
1603 webstatus = StorageStatus(ss)
1605 # create a few shares, with some leases on them
1606 self.make_shares(ss)
1607 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1609 # add a non-sharefile to exercise another code path
1610 fn = os.path.join(ss.sharedir,
1611 storage_index_to_dir(immutable_si_0),
1614 f.write("I am not a share.\n")
1617 # this is before the crawl has started, so we're not in a cycle yet
1618 initial_state = lc.get_state()
1619 self.failIf(lc.get_progress()["cycle-in-progress"])
1620 self.failIfIn("cycle-to-date", initial_state)
1621 self.failIfIn("estimated-remaining-cycle", initial_state)
1622 self.failIfIn("estimated-current-cycle", initial_state)
1623 self.failUnlessIn("history", initial_state)
1624 self.failUnlessEqual(initial_state["history"], {})
1626 ss.setServiceParent(self.s)
1630 d = fireEventually()
1632 # now examine the state right after the first bucket has been
1634 def _after_first_bucket(ignored):
1635 initial_state = lc.get_state()
1636 self.failUnlessIn("cycle-to-date", initial_state)
1637 self.failUnlessIn("estimated-remaining-cycle", initial_state)
1638 self.failUnlessIn("estimated-current-cycle", initial_state)
1639 self.failUnlessIn("history", initial_state)
1640 self.failUnlessEqual(initial_state["history"], {})
1642 so_far = initial_state["cycle-to-date"]
1643 self.failUnlessEqual(so_far["expiration-enabled"], False)
1644 self.failUnlessIn("configured-expiration-mode", so_far)
1645 self.failUnlessIn("lease-age-histogram", so_far)
1646 lah = so_far["lease-age-histogram"]
1647 self.failUnlessEqual(type(lah), list)
1648 self.failUnlessEqual(len(lah), 1)
1649 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1650 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1651 self.failUnlessEqual(so_far["corrupt-shares"], [])
1652 sr1 = so_far["space-recovered"]
1653 self.failUnlessEqual(sr1["examined-buckets"], 1)
1654 self.failUnlessEqual(sr1["examined-shares"], 1)
1655 self.failUnlessEqual(sr1["actual-shares"], 0)
1656 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1657 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1658 left = initial_state["estimated-remaining-cycle"]
1659 sr2 = left["space-recovered"]
1660 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1661 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1662 self.failIfEqual(sr2["actual-shares"], None)
1663 self.failIfEqual(sr2["configured-diskbytes"], None)
1664 self.failIfEqual(sr2["original-sharebytes"], None)
1665 d.addCallback(_after_first_bucket)
1666 d.addCallback(lambda ign: self.render1(webstatus))
1667 def _check_html_in_cycle(html):
1668 s = remove_tags(html)
1669 self.failUnlessIn("So far, this cycle has examined "
1670 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1671 self.failUnlessIn("and has recovered: "
1672 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1673 "0 B (0 B / 0 B)", s)
1674 self.failUnlessIn("If expiration were enabled, "
1675 "we would have recovered: "
1676 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1677 " 0 B (0 B / 0 B) by now", s)
1678 self.failUnlessIn("and the remainder of this cycle "
1679 "would probably recover: "
1680 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1681 " 0 B (0 B / 0 B)", s)
1682 self.failUnlessIn("and the whole cycle would probably recover: "
1683 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1684 " 0 B (0 B / 0 B)", s)
1685 self.failUnlessIn("if we were strictly using each lease's default "
1686 "31-day lease lifetime", s)
1687 self.failUnlessIn("this cycle would be expected to recover: ", s)
1688 d.addCallback(_check_html_in_cycle)
1690 # wait for the crawler to finish the first cycle. Nothing should have
1693 return bool(lc.get_state()["last-cycle-finished"] is not None)
1694 d.addCallback(lambda ign: self.poll(_wait))
1696 def _after_first_cycle(ignored):
1698 self.failIf("cycle-to-date" in s)
1699 self.failIf("estimated-remaining-cycle" in s)
1700 self.failIf("estimated-current-cycle" in s)
1701 last = s["history"][0]
1702 self.failUnlessIn("cycle-start-finish-times", last)
1703 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1704 self.failUnlessEqual(last["expiration-enabled"], False)
1705 self.failUnlessIn("configured-expiration-mode", last)
1707 self.failUnlessIn("lease-age-histogram", last)
1708 lah = last["lease-age-histogram"]
1709 self.failUnlessEqual(type(lah), list)
1710 self.failUnlessEqual(len(lah), 1)
1711 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1713 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1714 self.failUnlessEqual(last["corrupt-shares"], [])
1716 rec = last["space-recovered"]
1717 self.failUnlessEqual(rec["examined-buckets"], 4)
1718 self.failUnlessEqual(rec["examined-shares"], 4)
1719 self.failUnlessEqual(rec["actual-buckets"], 0)
1720 self.failUnlessEqual(rec["original-buckets"], 0)
1721 self.failUnlessEqual(rec["configured-buckets"], 0)
1722 self.failUnlessEqual(rec["actual-shares"], 0)
1723 self.failUnlessEqual(rec["original-shares"], 0)
1724 self.failUnlessEqual(rec["configured-shares"], 0)
1725 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1726 self.failUnlessEqual(rec["original-diskbytes"], 0)
1727 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1728 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1729 self.failUnlessEqual(rec["original-sharebytes"], 0)
1730 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1732 def _get_sharefile(si):
1733 return list(ss._iter_share_files(si))[0]
1734 def count_leases(si):
1735 return len(list(_get_sharefile(si).get_leases()))
1736 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1737 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1738 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1739 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1740 d.addCallback(_after_first_cycle)
1741 d.addCallback(lambda ign: self.render1(webstatus))
1742 def _check_html(html):
1743 s = remove_tags(html)
1744 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1745 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1746 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1747 "(2 mutable / 2 immutable),", s)
1748 self.failUnlessIn("but expiration was not enabled", s)
1749 d.addCallback(_check_html)
1750 d.addCallback(lambda ign: self.render_json(webstatus))
1751 def _check_json(json):
1752 data = simplejson.loads(json)
1753 self.failUnlessIn("lease-checker", data)
1754 self.failUnlessIn("lease-checker-progress", data)
1755 d.addCallback(_check_json)
1758 def backdate_lease(self, sf, renew_secret, new_expire_time):
1759 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1760 # "renew" a lease with a new_expire_time that is older than what the
1761 # current lease has), so we have to reach inside it.
1762 for i,lease in enumerate(sf.get_leases()):
1763 if lease.renew_secret == renew_secret:
1764 lease.expiration_time = new_expire_time
1765 f = open(sf.home, 'rb+')
1766 sf._write_lease_record(f, i, lease)
1769 raise IndexError("unable to renew non-existent lease")
1771 def test_expire_age(self):
1772 basedir = "storage/LeaseCrawler/expire_age"
1773 fileutil.make_dirs(basedir)
1774 # setting expiration_time to 2000 means that any lease which is more
1775 # than 2000s old will be expired.
1776 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1777 expiration_enabled=True,
1778 expiration_mode="age",
1779 expiration_override_lease_duration=2000)
1780 # make it start sooner than usual.
1781 lc = ss.lease_checker
1783 lc.stop_after_first_bucket = True
1784 webstatus = StorageStatus(ss)
1786 # create a few shares, with some leases on them
1787 self.make_shares(ss)
1788 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1790 def count_shares(si):
1791 return len(list(ss._iter_share_files(si)))
1792 def _get_sharefile(si):
1793 return list(ss._iter_share_files(si))[0]
1794 def count_leases(si):
1795 return len(list(_get_sharefile(si).get_leases()))
1797 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1798 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1799 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1800 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1801 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1802 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1803 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1804 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1806 # artificially crank back the expiration time on the first lease of
1807 # each share, to make it look like it expired already (age=1000s).
1808 # Some shares have an extra lease which is set to expire at the
1809 # default time in 31 days from now (age=31days). We then run the
1810 # crawler, which will expire the first lease, making some shares get
1811 # deleted and others stay alive (with one remaining lease)
1814 sf0 = _get_sharefile(immutable_si_0)
1815 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1816 sf0_size = os.stat(sf0.home).st_size
1818 # immutable_si_1 gets an extra lease
1819 sf1 = _get_sharefile(immutable_si_1)
1820 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1822 sf2 = _get_sharefile(mutable_si_2)
1823 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1824 sf2_size = os.stat(sf2.home).st_size
1826 # mutable_si_3 gets an extra lease
1827 sf3 = _get_sharefile(mutable_si_3)
1828 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1830 ss.setServiceParent(self.s)
1832 d = fireEventually()
1833 # examine the state right after the first bucket has been processed
1834 def _after_first_bucket(ignored):
1835 p = lc.get_progress()
1836 self.failUnless(p["cycle-in-progress"], p)
1837 d.addCallback(_after_first_bucket)
1838 d.addCallback(lambda ign: self.render1(webstatus))
1839 def _check_html_in_cycle(html):
1840 s = remove_tags(html)
1841 # the first bucket encountered gets deleted, and its prefix
1842 # happens to be about 1/5th of the way through the ring, so the
1843 # predictor thinks we'll have 5 shares and that we'll delete them
1844 # all. This part of the test depends upon the SIs landing right
1845 # where they do now.
1846 self.failUnlessIn("The remainder of this cycle is expected to "
1847 "recover: 4 shares, 4 buckets", s)
1848 self.failUnlessIn("The whole cycle is expected to examine "
1849 "5 shares in 5 buckets and to recover: "
1850 "5 shares, 5 buckets", s)
1851 d.addCallback(_check_html_in_cycle)
1853 # wait for the crawler to finish the first cycle. Two shares should
1856 return bool(lc.get_state()["last-cycle-finished"] is not None)
1857 d.addCallback(lambda ign: self.poll(_wait))
1859 def _after_first_cycle(ignored):
1860 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1861 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1862 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1863 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1864 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1865 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1868 last = s["history"][0]
1870 self.failUnlessEqual(last["expiration-enabled"], True)
1871 self.failUnlessEqual(last["configured-expiration-mode"],
1872 ("age", 2000, None, ("mutable", "immutable")))
1873 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1875 rec = last["space-recovered"]
1876 self.failUnlessEqual(rec["examined-buckets"], 4)
1877 self.failUnlessEqual(rec["examined-shares"], 4)
1878 self.failUnlessEqual(rec["actual-buckets"], 2)
1879 self.failUnlessEqual(rec["original-buckets"], 2)
1880 self.failUnlessEqual(rec["configured-buckets"], 2)
1881 self.failUnlessEqual(rec["actual-shares"], 2)
1882 self.failUnlessEqual(rec["original-shares"], 2)
1883 self.failUnlessEqual(rec["configured-shares"], 2)
1884 size = sf0_size + sf2_size
1885 self.failUnlessEqual(rec["actual-sharebytes"], size)
1886 self.failUnlessEqual(rec["original-sharebytes"], size)
1887 self.failUnlessEqual(rec["configured-sharebytes"], size)
1888 # different platforms have different notions of "blocks used by
1889 # this file", so merely assert that it's a number
1890 self.failUnless(rec["actual-diskbytes"] >= 0,
1891 rec["actual-diskbytes"])
1892 self.failUnless(rec["original-diskbytes"] >= 0,
1893 rec["original-diskbytes"])
1894 self.failUnless(rec["configured-diskbytes"] >= 0,
1895 rec["configured-diskbytes"])
1896 d.addCallback(_after_first_cycle)
1897 d.addCallback(lambda ign: self.render1(webstatus))
1898 def _check_html(html):
1899 s = remove_tags(html)
1900 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1901 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1902 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1903 d.addCallback(_check_html)
1906 def test_expire_cutoff_date(self):
1907 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1908 fileutil.make_dirs(basedir)
1909 # setting cutoff-date to 2000 seconds ago means that any lease which
1910 # is more than 2000s old will be expired.
1912 then = int(now - 2000)
1913 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1914 expiration_enabled=True,
1915 expiration_mode="cutoff-date",
1916 expiration_cutoff_date=then)
1917 # make it start sooner than usual.
1918 lc = ss.lease_checker
1920 lc.stop_after_first_bucket = True
1921 webstatus = StorageStatus(ss)
1923 # create a few shares, with some leases on them
1924 self.make_shares(ss)
1925 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1927 def count_shares(si):
1928 return len(list(ss._iter_share_files(si)))
1929 def _get_sharefile(si):
1930 return list(ss._iter_share_files(si))[0]
1931 def count_leases(si):
1932 return len(list(_get_sharefile(si).get_leases()))
1934 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1935 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1936 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1937 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1938 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1939 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1940 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1941 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1943 # artificially crank back the expiration time on the first lease of
1944 # each share, to make it look like was renewed 3000s ago. To achieve
1945 # this, we need to set the expiration time to now-3000+31days. This
1946 # will change when the lease format is improved to contain both
1947 # create/renew time and duration.
1948 new_expiration_time = now - 3000 + 31*24*60*60
1950 # Some shares have an extra lease which is set to expire at the
1951 # default time in 31 days from now (age=31days). We then run the
1952 # crawler, which will expire the first lease, making some shares get
1953 # deleted and others stay alive (with one remaining lease)
1955 sf0 = _get_sharefile(immutable_si_0)
1956 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1957 sf0_size = os.stat(sf0.home).st_size
1959 # immutable_si_1 gets an extra lease
1960 sf1 = _get_sharefile(immutable_si_1)
1961 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1963 sf2 = _get_sharefile(mutable_si_2)
1964 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1965 sf2_size = os.stat(sf2.home).st_size
1967 # mutable_si_3 gets an extra lease
1968 sf3 = _get_sharefile(mutable_si_3)
1969 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1971 ss.setServiceParent(self.s)
1973 d = fireEventually()
1974 # examine the state right after the first bucket has been processed
1975 def _after_first_bucket(ignored):
1976 p = lc.get_progress()
1977 self.failUnless(p["cycle-in-progress"], p)
1978 d.addCallback(_after_first_bucket)
1979 d.addCallback(lambda ign: self.render1(webstatus))
1980 def _check_html_in_cycle(html):
1981 s = remove_tags(html)
1982 # the first bucket encountered gets deleted, and its prefix
1983 # happens to be about 1/5th of the way through the ring, so the
1984 # predictor thinks we'll have 5 shares and that we'll delete them
1985 # all. This part of the test depends upon the SIs landing right
1986 # where they do now.
1987 self.failUnlessIn("The remainder of this cycle is expected to "
1988 "recover: 4 shares, 4 buckets", s)
1989 self.failUnlessIn("The whole cycle is expected to examine "
1990 "5 shares in 5 buckets and to recover: "
1991 "5 shares, 5 buckets", s)
1992 d.addCallback(_check_html_in_cycle)
1994 # wait for the crawler to finish the first cycle. Two shares should
1997 return bool(lc.get_state()["last-cycle-finished"] is not None)
1998 d.addCallback(lambda ign: self.poll(_wait))
2000 def _after_first_cycle(ignored):
2001 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2002 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2003 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2004 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2005 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2006 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2009 last = s["history"][0]
2011 self.failUnlessEqual(last["expiration-enabled"], True)
2012 self.failUnlessEqual(last["configured-expiration-mode"],
2013 ("cutoff-date", None, then,
2014 ("mutable", "immutable")))
2015 self.failUnlessEqual(last["leases-per-share-histogram"],
2018 rec = last["space-recovered"]
2019 self.failUnlessEqual(rec["examined-buckets"], 4)
2020 self.failUnlessEqual(rec["examined-shares"], 4)
2021 self.failUnlessEqual(rec["actual-buckets"], 2)
2022 self.failUnlessEqual(rec["original-buckets"], 0)
2023 self.failUnlessEqual(rec["configured-buckets"], 2)
2024 self.failUnlessEqual(rec["actual-shares"], 2)
2025 self.failUnlessEqual(rec["original-shares"], 0)
2026 self.failUnlessEqual(rec["configured-shares"], 2)
2027 size = sf0_size + sf2_size
2028 self.failUnlessEqual(rec["actual-sharebytes"], size)
2029 self.failUnlessEqual(rec["original-sharebytes"], 0)
2030 self.failUnlessEqual(rec["configured-sharebytes"], size)
2031 # different platforms have different notions of "blocks used by
2032 # this file", so merely assert that it's a number
2033 self.failUnless(rec["actual-diskbytes"] >= 0,
2034 rec["actual-diskbytes"])
2035 self.failUnless(rec["original-diskbytes"] >= 0,
2036 rec["original-diskbytes"])
2037 self.failUnless(rec["configured-diskbytes"] >= 0,
2038 rec["configured-diskbytes"])
2039 d.addCallback(_after_first_cycle)
2040 d.addCallback(lambda ign: self.render1(webstatus))
2041 def _check_html(html):
2042 s = remove_tags(html)
2043 self.failUnlessIn("Expiration Enabled:"
2044 " expired leases will be removed", s)
2045 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2046 substr = "Leases created or last renewed before %s will be considered expired." % date
2047 self.failUnlessIn(substr, s)
2048 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2049 d.addCallback(_check_html)
2052 def test_only_immutable(self):
2053 basedir = "storage/LeaseCrawler/only_immutable"
2054 fileutil.make_dirs(basedir)
2056 then = int(now - 2000)
2057 ss = StorageServer(basedir, "\x00" * 20,
2058 expiration_enabled=True,
2059 expiration_mode="cutoff-date",
2060 expiration_cutoff_date=then,
2061 expiration_sharetypes=("immutable",))
2062 lc = ss.lease_checker
2064 webstatus = StorageStatus(ss)
2066 self.make_shares(ss)
2067 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2068 # set all leases to be expirable
2069 new_expiration_time = now - 3000 + 31*24*60*60
2071 def count_shares(si):
2072 return len(list(ss._iter_share_files(si)))
2073 def _get_sharefile(si):
2074 return list(ss._iter_share_files(si))[0]
2075 def count_leases(si):
2076 return len(list(_get_sharefile(si).get_leases()))
2078 sf0 = _get_sharefile(immutable_si_0)
2079 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2080 sf1 = _get_sharefile(immutable_si_1)
2081 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2082 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2083 sf2 = _get_sharefile(mutable_si_2)
2084 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2085 sf3 = _get_sharefile(mutable_si_3)
2086 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2087 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2089 ss.setServiceParent(self.s)
2091 return bool(lc.get_state()["last-cycle-finished"] is not None)
2092 d = self.poll(_wait)
2094 def _after_first_cycle(ignored):
2095 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2096 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2097 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2098 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2099 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2100 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2101 d.addCallback(_after_first_cycle)
2102 d.addCallback(lambda ign: self.render1(webstatus))
2103 def _check_html(html):
2104 s = remove_tags(html)
2105 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2106 d.addCallback(_check_html)
2109 def test_only_mutable(self):
2110 basedir = "storage/LeaseCrawler/only_mutable"
2111 fileutil.make_dirs(basedir)
2113 then = int(now - 2000)
2114 ss = StorageServer(basedir, "\x00" * 20,
2115 expiration_enabled=True,
2116 expiration_mode="cutoff-date",
2117 expiration_cutoff_date=then,
2118 expiration_sharetypes=("mutable",))
2119 lc = ss.lease_checker
2121 webstatus = StorageStatus(ss)
2123 self.make_shares(ss)
2124 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2125 # set all leases to be expirable
2126 new_expiration_time = now - 3000 + 31*24*60*60
2128 def count_shares(si):
2129 return len(list(ss._iter_share_files(si)))
2130 def _get_sharefile(si):
2131 return list(ss._iter_share_files(si))[0]
2132 def count_leases(si):
2133 return len(list(_get_sharefile(si).get_leases()))
2135 sf0 = _get_sharefile(immutable_si_0)
2136 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2137 sf1 = _get_sharefile(immutable_si_1)
2138 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2139 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2140 sf2 = _get_sharefile(mutable_si_2)
2141 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2142 sf3 = _get_sharefile(mutable_si_3)
2143 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2144 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2146 ss.setServiceParent(self.s)
2148 return bool(lc.get_state()["last-cycle-finished"] is not None)
2149 d = self.poll(_wait)
2151 def _after_first_cycle(ignored):
2152 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2153 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2154 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2155 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2156 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2157 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2158 d.addCallback(_after_first_cycle)
2159 d.addCallback(lambda ign: self.render1(webstatus))
2160 def _check_html(html):
2161 s = remove_tags(html)
2162 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2163 d.addCallback(_check_html)
2166 def test_bad_mode(self):
2167 basedir = "storage/LeaseCrawler/bad_mode"
2168 fileutil.make_dirs(basedir)
2169 e = self.failUnlessRaises(ValueError,
2170 StorageServer, basedir, "\x00" * 20,
2171 expiration_mode="bogus")
2172 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2174 def test_parse_duration(self):
2178 p = time_format.parse_duration
2179 self.failUnlessEqual(p("7days"), 7*DAY)
2180 self.failUnlessEqual(p("31day"), 31*DAY)
2181 self.failUnlessEqual(p("60 days"), 60*DAY)
2182 self.failUnlessEqual(p("2mo"), 2*MONTH)
2183 self.failUnlessEqual(p("3 month"), 3*MONTH)
2184 self.failUnlessEqual(p("2years"), 2*YEAR)
2185 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2186 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2188 def test_parse_date(self):
2189 p = time_format.parse_date
2190 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2191 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2193 def test_limited_history(self):
2194 basedir = "storage/LeaseCrawler/limited_history"
2195 fileutil.make_dirs(basedir)
2196 ss = StorageServer(basedir, "\x00" * 20)
2197 # make it start sooner than usual.
2198 lc = ss.lease_checker
2202 # create a few shares, with some leases on them
2203 self.make_shares(ss)
2205 ss.setServiceParent(self.s)
2207 def _wait_until_15_cycles_done():
2208 last = lc.state["last-cycle-finished"]
2209 if last is not None and last >= 15:
2214 d = self.poll(_wait_until_15_cycles_done)
2216 def _check(ignored):
2219 self.failUnlessEqual(len(h), 10)
2220 self.failUnlessEqual(max(h.keys()), 15)
2221 self.failUnlessEqual(min(h.keys()), 6)
2222 d.addCallback(_check)
2225 def test_unpredictable_future(self):
2226 basedir = "storage/LeaseCrawler/unpredictable_future"
2227 fileutil.make_dirs(basedir)
2228 ss = StorageServer(basedir, "\x00" * 20)
2229 # make it start sooner than usual.
2230 lc = ss.lease_checker
2232 lc.cpu_slice = -1.0 # stop quickly
2234 self.make_shares(ss)
2236 ss.setServiceParent(self.s)
2238 d = fireEventually()
2239 def _check(ignored):
2240 # this should fire after the first bucket is complete, but before
2241 # the first prefix is complete, so the progress-measurer won't
2242 # think we've gotten far enough to raise our percent-complete
2243 # above 0%, triggering the cannot-predict-the-future code in
2244 # expirer.py . This will have to change if/when the
2245 # progress-measurer gets smart enough to count buckets (we'll
2246 # have to interrupt it even earlier, before it's finished the
2249 self.failUnlessIn("cycle-to-date", s)
2250 self.failUnlessIn("estimated-remaining-cycle", s)
2251 self.failUnlessIn("estimated-current-cycle", s)
2253 left = s["estimated-remaining-cycle"]["space-recovered"]
2254 self.failUnlessEqual(left["actual-buckets"], None)
2255 self.failUnlessEqual(left["original-buckets"], None)
2256 self.failUnlessEqual(left["configured-buckets"], None)
2257 self.failUnlessEqual(left["actual-shares"], None)
2258 self.failUnlessEqual(left["original-shares"], None)
2259 self.failUnlessEqual(left["configured-shares"], None)
2260 self.failUnlessEqual(left["actual-diskbytes"], None)
2261 self.failUnlessEqual(left["original-diskbytes"], None)
2262 self.failUnlessEqual(left["configured-diskbytes"], None)
2263 self.failUnlessEqual(left["actual-sharebytes"], None)
2264 self.failUnlessEqual(left["original-sharebytes"], None)
2265 self.failUnlessEqual(left["configured-sharebytes"], None)
2267 full = s["estimated-remaining-cycle"]["space-recovered"]
2268 self.failUnlessEqual(full["actual-buckets"], None)
2269 self.failUnlessEqual(full["original-buckets"], None)
2270 self.failUnlessEqual(full["configured-buckets"], None)
2271 self.failUnlessEqual(full["actual-shares"], None)
2272 self.failUnlessEqual(full["original-shares"], None)
2273 self.failUnlessEqual(full["configured-shares"], None)
2274 self.failUnlessEqual(full["actual-diskbytes"], None)
2275 self.failUnlessEqual(full["original-diskbytes"], None)
2276 self.failUnlessEqual(full["configured-diskbytes"], None)
2277 self.failUnlessEqual(full["actual-sharebytes"], None)
2278 self.failUnlessEqual(full["original-sharebytes"], None)
2279 self.failUnlessEqual(full["configured-sharebytes"], None)
2281 d.addCallback(_check)
2284 def test_no_st_blocks(self):
2285 basedir = "storage/LeaseCrawler/no_st_blocks"
2286 fileutil.make_dirs(basedir)
2287 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2288 expiration_mode="age",
2289 expiration_override_lease_duration=-1000)
2290 # a negative expiration_time= means the "configured-"
2291 # space-recovered counts will be non-zero, since all shares will have
2294 # make it start sooner than usual.
2295 lc = ss.lease_checker
2298 self.make_shares(ss)
2299 ss.setServiceParent(self.s)
2301 return bool(lc.get_state()["last-cycle-finished"] is not None)
2302 d = self.poll(_wait)
2304 def _check(ignored):
2306 last = s["history"][0]
2307 rec = last["space-recovered"]
2308 self.failUnlessEqual(rec["configured-buckets"], 4)
2309 self.failUnlessEqual(rec["configured-shares"], 4)
2310 self.failUnless(rec["configured-sharebytes"] > 0,
2311 rec["configured-sharebytes"])
2312 # without the .st_blocks field in os.stat() results, we should be
2313 # reporting diskbytes==sharebytes
2314 self.failUnlessEqual(rec["configured-sharebytes"],
2315 rec["configured-diskbytes"])
2316 d.addCallback(_check)
2319 def test_share_corruption(self):
2320 self._poll_should_ignore_these_errors = [
2321 UnknownMutableContainerVersionError,
2322 UnknownImmutableContainerVersionError,
2324 basedir = "storage/LeaseCrawler/share_corruption"
2325 fileutil.make_dirs(basedir)
2326 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2327 w = StorageStatus(ss)
2328 # make it start sooner than usual.
2329 lc = ss.lease_checker
2330 lc.stop_after_first_bucket = True
2334 # create a few shares, with some leases on them
2335 self.make_shares(ss)
2337 # now corrupt one, and make sure the lease-checker keeps going
2338 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2339 first = min(self.sis)
2340 first_b32 = base32.b2a(first)
2341 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2344 f.write("BAD MAGIC")
2346 # if get_share_file() doesn't see the correct mutable magic, it
2347 # assumes the file is an immutable share, and then
2348 # immutable.ShareFile sees a bad version. So regardless of which kind
2349 # of share we corrupted, this will trigger an
2350 # UnknownImmutableContainerVersionError.
2352 # also create an empty bucket
2353 empty_si = base32.b2a("\x04"*16)
2354 empty_bucket_dir = os.path.join(ss.sharedir,
2355 storage_index_to_dir(empty_si))
2356 fileutil.make_dirs(empty_bucket_dir)
2358 ss.setServiceParent(self.s)
2360 d = fireEventually()
2362 # now examine the state right after the first bucket has been
2364 def _after_first_bucket(ignored):
2365 so_far = lc.get_state()["cycle-to-date"]
2366 rec = so_far["space-recovered"]
2367 self.failUnlessEqual(rec["examined-buckets"], 1)
2368 self.failUnlessEqual(rec["examined-shares"], 0)
2369 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2370 d.addCallback(_after_first_bucket)
2372 d.addCallback(lambda ign: self.render_json(w))
2373 def _check_json(json):
2374 data = simplejson.loads(json)
2375 # grr. json turns all dict keys into strings.
2376 so_far = data["lease-checker"]["cycle-to-date"]
2377 corrupt_shares = so_far["corrupt-shares"]
2378 # it also turns all tuples into lists
2379 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2380 d.addCallback(_check_json)
2381 d.addCallback(lambda ign: self.render1(w))
2382 def _check_html(html):
2383 s = remove_tags(html)
2384 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2385 d.addCallback(_check_html)
2388 return bool(lc.get_state()["last-cycle-finished"] is not None)
2389 d.addCallback(lambda ign: self.poll(_wait))
2391 def _after_first_cycle(ignored):
2393 last = s["history"][0]
2394 rec = last["space-recovered"]
2395 self.failUnlessEqual(rec["examined-buckets"], 5)
2396 self.failUnlessEqual(rec["examined-shares"], 3)
2397 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2398 d.addCallback(_after_first_cycle)
2399 d.addCallback(lambda ign: self.render_json(w))
2400 def _check_json_history(json):
2401 data = simplejson.loads(json)
2402 last = data["lease-checker"]["history"]["0"]
2403 corrupt_shares = last["corrupt-shares"]
2404 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2405 d.addCallback(_check_json_history)
2406 d.addCallback(lambda ign: self.render1(w))
2407 def _check_html_history(html):
2408 s = remove_tags(html)
2409 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2410 d.addCallback(_check_html_history)
2413 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2414 UnknownImmutableContainerVersionError)
2419 def render_json(self, page):
2420 d = self.render1(page, args={"t": ["json"]})
2423 class NoDiskStatsServer(StorageServer):
2424 def get_disk_stats(self):
2425 raise AttributeError
2427 class BadDiskStatsServer(StorageServer):
2428 def get_disk_stats(self):
2431 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2434 self.s = service.MultiService()
2435 self.s.startService()
2437 return self.s.stopService()
2439 def test_no_server(self):
2440 w = StorageStatus(None)
2441 html = w.renderSynchronously()
2442 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2444 def test_status(self):
2445 basedir = "storage/WebStatus/status"
2446 fileutil.make_dirs(basedir)
2447 ss = StorageServer(basedir, "\x00" * 20)
2448 ss.setServiceParent(self.s)
2449 w = StorageStatus(ss)
2451 def _check_html(html):
2452 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2453 s = remove_tags(html)
2454 self.failUnlessIn("Accepting new shares: Yes", s)
2455 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2456 d.addCallback(_check_html)
2457 d.addCallback(lambda ign: self.render_json(w))
2458 def _check_json(json):
2459 data = simplejson.loads(json)
2461 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2462 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2463 self.failUnlessIn("bucket-counter", data)
2464 self.failUnlessIn("lease-checker", data)
2465 d.addCallback(_check_json)
2468 def render_json(self, page):
2469 d = self.render1(page, args={"t": ["json"]})
2472 def test_status_no_disk_stats(self):
2473 # Some platforms may have no disk stats API. Make sure the code can handle that
2474 # (test runs on all platforms).
2475 basedir = "storage/WebStatus/status_no_disk_stats"
2476 fileutil.make_dirs(basedir)
2477 ss = NoDiskStatsServer(basedir, "\x00" * 20)
2478 ss.setServiceParent(self.s)
2479 w = StorageStatus(ss)
2480 html = w.renderSynchronously()
2481 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2482 s = remove_tags(html)
2483 self.failUnlessIn("Accepting new shares: Yes", s)
2484 self.failUnlessIn("Total disk space: ?", s)
2485 self.failUnlessIn("Space Available to Tahoe: ?", s)
2486 self.failUnless(ss.get_available_space() is None)
2488 def test_status_bad_disk_stats(self):
2489 # If the API to get disk stats exists but a call to it fails, then the status should
2490 # show that no shares will be accepted, and get_available_space() should be 0.
2491 basedir = "storage/WebStatus/status_bad_disk_stats"
2492 fileutil.make_dirs(basedir)
2493 ss = BadDiskStatsServer(basedir, "\x00" * 20)
2494 ss.setServiceParent(self.s)
2495 w = StorageStatus(ss)
2496 html = w.renderSynchronously()
2497 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2498 s = remove_tags(html)
2499 self.failUnlessIn("Accepting new shares: No", s)
2500 self.failUnlessIn("Total disk space: ?", s)
2501 self.failUnlessIn("Space Available to Tahoe: ?", s)
2502 self.failUnlessEqual(ss.get_available_space(), 0)
2504 def test_readonly(self):
2505 basedir = "storage/WebStatus/readonly"
2506 fileutil.make_dirs(basedir)
2507 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2508 ss.setServiceParent(self.s)
2509 w = StorageStatus(ss)
2510 html = w.renderSynchronously()
2511 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2512 s = remove_tags(html)
2513 self.failUnlessIn("Accepting new shares: No", s)
2515 def test_reserved(self):
2516 basedir = "storage/WebStatus/reserved"
2517 fileutil.make_dirs(basedir)
2518 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2519 ss.setServiceParent(self.s)
2520 w = StorageStatus(ss)
2521 html = w.renderSynchronously()
2522 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2523 s = remove_tags(html)
2524 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2526 def test_huge_reserved(self):
2527 basedir = "storage/WebStatus/reserved"
2528 fileutil.make_dirs(basedir)
2529 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2530 ss.setServiceParent(self.s)
2531 w = StorageStatus(ss)
2532 html = w.renderSynchronously()
2533 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2534 s = remove_tags(html)
2535 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2537 def test_util(self):
2538 w = StorageStatus(None)
2539 self.failUnlessEqual(w.render_space(None, None), "?")
2540 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2541 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2542 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2543 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2544 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)