2 import time, os.path, stat, re, simplejson, struct
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
30 def __init__(self, ignore_disconnectors=False):
31 self.ignore = ignore_disconnectors
32 self.disconnectors = {}
33 def notifyOnDisconnect(self, f, *args, **kwargs):
37 self.disconnectors[m] = (f, args, kwargs)
39 def dontNotifyOnDisconnect(self, marker):
42 del self.disconnectors[marker]
44 class FakeStatsProvider:
45 def count(self, name, delta=1):
47 def register_producer(self, producer):
50 class Bucket(unittest.TestCase):
51 def make_workdir(self, name):
52 basedir = os.path.join("storage", "Bucket", name)
53 incoming = os.path.join(basedir, "tmp", "bucket")
54 final = os.path.join(basedir, "bucket")
55 fileutil.make_dirs(basedir)
56 fileutil.make_dirs(os.path.join(basedir, "tmp"))
57 return incoming, final
59 def bucket_writer_closed(self, bw, consumed):
61 def add_latency(self, category, latency):
63 def count(self, name, delta=1):
68 renew_secret = os.urandom(32)
69 cancel_secret = os.urandom(32)
70 expiration_time = time.time() + 5000
71 return LeaseInfo(owner_num, renew_secret, cancel_secret,
72 expiration_time, "\x00" * 20)
74 def test_create(self):
75 incoming, final = self.make_workdir("test_create")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*25)
81 bw.remote_write(75, "d"*7)
84 def test_readwrite(self):
85 incoming, final = self.make_workdir("test_readwrite")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*7) # last block may be short
94 br = BucketReader(self, bw.finalhome)
95 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101 def callRemote(self, methname, *args, **kwargs):
103 meth = getattr(self.target, "remote_" + methname)
104 return meth(*args, **kwargs)
105 return defer.maybeDeferred(_call)
107 class BucketProxy(unittest.TestCase):
108 def make_bucket(self, name, size):
109 basedir = os.path.join("storage", "BucketProxy", name)
110 incoming = os.path.join(basedir, "tmp", "bucket")
111 final = os.path.join(basedir, "bucket")
112 fileutil.make_dirs(basedir)
113 fileutil.make_dirs(os.path.join(basedir, "tmp"))
114 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
120 def make_lease(self):
122 renew_secret = os.urandom(32)
123 cancel_secret = os.urandom(32)
124 expiration_time = time.time() + 5000
125 return LeaseInfo(owner_num, renew_secret, cancel_secret,
126 expiration_time, "\x00" * 20)
128 def bucket_writer_closed(self, bw, consumed):
130 def add_latency(self, category, latency):
132 def count(self, name, delta=1):
135 def test_create(self):
136 bw, rb, sharefname = self.make_bucket("test_create", 500)
137 bp = WriteBucketProxy(rb,
142 uri_extension_size_max=500, nodeid=None)
143 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
145 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146 # Let's pretend each share has 100 bytes of data, and that there are
147 # 4 segments (25 bytes each), and 8 shares total. So the two
148 # per-segment merkle trees (crypttext_hash_tree,
149 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152 # long. That should make the whole share:
154 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165 uri_extension = "s" + "E"*498 + "e"
167 bw, rb, sharefname = self.make_bucket(name, sharesize)
173 uri_extension_size_max=len(uri_extension),
177 d.addCallback(lambda res: bp.put_block(0, "a"*25))
178 d.addCallback(lambda res: bp.put_block(1, "b"*25))
179 d.addCallback(lambda res: bp.put_block(2, "c"*25))
180 d.addCallback(lambda res: bp.put_block(3, "d"*20))
181 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185 d.addCallback(lambda res: bp.close())
187 # now read everything back
188 def _start_reading(res):
189 br = BucketReader(self, sharefname)
192 rbp = rbp_class(rb, peerid="abc", storage_index="")
193 self.failUnless("to peer" in repr(rbp))
194 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
196 d1 = rbp.get_block_data(0, 25, 25)
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206 d1.addCallback(lambda res:
207 self.failUnlessEqual(res, crypttext_hashes))
208 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210 d1.addCallback(lambda res: rbp.get_share_hashes())
211 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212 d1.addCallback(lambda res: rbp.get_uri_extension())
213 d1.addCallback(lambda res:
214 self.failUnlessEqual(res, uri_extension))
218 d.addCallback(_start_reading)
222 def test_readwrite_v1(self):
223 return self._do_test_readwrite("test_readwrite_v1",
224 0x24, WriteBucketProxy, ReadBucketProxy)
226 def test_readwrite_v2(self):
227 return self._do_test_readwrite("test_readwrite_v2",
228 0x44, WriteBucketProxy_v2, ReadBucketProxy)
230 class FakeDiskStorageServer(StorageServer):
232 def get_disk_stats(self):
233 return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
235 class Server(unittest.TestCase):
238 self.sparent = LoggingServiceParent()
239 self.sparent.startService()
240 self._lease_secret = itertools.count()
242 return self.sparent.stopService()
244 def workdir(self, name):
245 basedir = os.path.join("storage", "Server", name)
248 def create(self, name, reserved_space=0, klass=StorageServer):
249 workdir = self.workdir(name)
250 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251 stats_provider=FakeStatsProvider())
252 ss.setServiceParent(self.sparent)
255 def test_create(self):
256 self.create("test_create")
258 def allocate(self, ss, storage_index, sharenums, size, canary=None):
259 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
262 canary = FakeCanary()
263 return ss.remote_allocate_buckets(storage_index,
264 renew_secret, cancel_secret,
265 sharenums, size, canary)
267 def test_large_share(self):
268 ss = self.create("test_large_share")
270 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271 self.failUnlessEqual(already, set())
272 self.failUnlessEqual(set(writers.keys()), set([0]))
274 shnum, bucket = writers.items()[0]
275 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
276 bucket.remote_write(2**32, "ab")
277 bucket.remote_close()
279 readers = ss.remote_get_buckets("allocate")
280 reader = readers[shnum]
281 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
284 def test_dont_overfill_dirs(self):
286 This test asserts that if you add a second share whose storage index
287 share lots of leading bits with an extant share (but isn't the exact
288 same storage index), this won't add an entry to the share directory.
290 ss = self.create("test_dont_overfill_dirs")
291 already, writers = self.allocate(ss, "storageindex", [0], 10)
292 for i, wb in writers.items():
293 wb.remote_write(0, "%10d" % i)
295 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
297 children_of_storedir = set(os.listdir(storedir))
299 # Now store another one under another storageindex that has leading
300 # chars the same as the first storageindex.
301 already, writers = self.allocate(ss, "storageindey", [0], 10)
302 for i, wb in writers.items():
303 wb.remote_write(0, "%10d" % i)
305 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
307 new_children_of_storedir = set(os.listdir(storedir))
308 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
310 def test_remove_incoming(self):
311 ss = self.create("test_remove_incoming")
312 already, writers = self.allocate(ss, "vid", range(3), 10)
313 for i,wb in writers.items():
314 wb.remote_write(0, "%10d" % i)
316 incoming_share_dir = wb.incominghome
317 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319 incoming_dir = os.path.dirname(incoming_prefix_dir)
320 self.failIf(os.path.exists(incoming_bucket_dir))
321 self.failIf(os.path.exists(incoming_prefix_dir))
322 self.failUnless(os.path.exists(incoming_dir))
324 def test_allocate(self):
325 ss = self.create("test_allocate")
327 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
329 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330 self.failUnlessEqual(already, set())
331 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
333 # while the buckets are open, they should not count as readable
334 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
337 for i,wb in writers.items():
338 wb.remote_write(0, "%25d" % i)
340 # aborting a bucket that was already closed is a no-op
343 # now they should be readable
344 b = ss.remote_get_buckets("allocate")
345 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
348 self.failUnless("BucketReader" in b_str, b_str)
349 self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
351 # now if we ask about writing again, the server should offer those
352 # three buckets as already present. It should offer them even if we
353 # don't ask about those specific ones.
354 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355 self.failUnlessEqual(already, set([0,1,2]))
356 self.failUnlessEqual(set(writers.keys()), set([3,4]))
358 # while those two buckets are open for writing, the server should
359 # refuse to offer them to uploaders
361 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362 self.failUnlessEqual(already2, set([0,1,2]))
363 self.failUnlessEqual(set(writers2.keys()), set([5]))
365 # aborting the writes should remove the tempfiles
366 for i,wb in writers2.items():
368 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369 self.failUnlessEqual(already2, set([0,1,2]))
370 self.failUnlessEqual(set(writers2.keys()), set([5]))
372 for i,wb in writers2.items():
374 for i,wb in writers.items():
377 def test_bad_container_version(self):
378 ss = self.create("test_bad_container_version")
379 a,w = self.allocate(ss, "si1", [0], 10)
380 w[0].remote_write(0, "\xff"*10)
383 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
386 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
389 ss.remote_get_buckets("allocate")
391 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392 ss.remote_get_buckets, "si1")
393 self.failUnless(" had version 0 but we wanted 1" in str(e), e)
395 def test_disconnect(self):
396 # simulate a disconnection
397 ss = self.create("test_disconnect")
398 canary = FakeCanary()
399 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400 self.failUnlessEqual(already, set())
401 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402 for (f,args,kwargs) in canary.disconnectors.values():
407 # that ought to delete the incoming shares
408 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409 self.failUnlessEqual(already, set())
410 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
412 def test_reserved_space(self):
413 ss = self.create("test_reserved_space", reserved_space=10000,
414 klass=FakeDiskStorageServer)
415 # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
417 # 15k available, 10k reserved, leaves 5k for shares
419 # a newly created and filled share incurs this much overhead, beyond
420 # the size we request.
422 LEASE_SIZE = 4+32+32+4
423 canary = FakeCanary(True)
424 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425 self.failUnlessEqual(len(writers), 3)
426 # now the StorageServer should have 3000 bytes provisionally
427 # allocated, allowing only 2000 more to be claimed
428 self.failUnlessEqual(len(ss._active_writers), 3)
430 # allocating 1001-byte shares only leaves room for one
431 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432 self.failUnlessEqual(len(writers2), 1)
433 self.failUnlessEqual(len(ss._active_writers), 4)
435 # we abandon the first set, so their provisional allocation should be
439 self.failUnlessEqual(len(ss._active_writers), 1)
440 # now we have a provisional allocation of 1001 bytes
442 # and we close the second set, so their provisional allocation should
443 # become real, long-term allocation, and grows to include the
445 for bw in writers2.values():
446 bw.remote_write(0, "a"*25)
451 self.failUnlessEqual(len(ss._active_writers), 0)
453 allocated = 1001 + OVERHEAD + LEASE_SIZE
455 # we have to manually increase DISKAVAIL, since we're not doing real
457 ss.DISKAVAIL -= allocated
459 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462 self.failUnlessEqual(len(writers3), 39)
463 self.failUnlessEqual(len(ss._active_writers), 39)
467 self.failUnlessEqual(len(ss._active_writers), 0)
468 ss.disownServiceParent()
471 def test_disk_stats(self):
472 # This will spuriously fail if there is zero disk space left (but so will other tests).
473 ss = self.create("test_disk_stats", reserved_space=0)
475 disk = ss.get_disk_stats()
476 self.failUnless(disk['total'] > 0, disk['total'])
477 self.failUnless(disk['used'] > 0, disk['used'])
478 self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
479 self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
480 self.failUnless(disk['avail'] > 0, disk['avail'])
482 def test_disk_stats_avail_nonnegative(self):
483 ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
485 disk = ss.get_disk_stats()
486 self.failUnlessEqual(disk['avail'], 0)
489 basedir = self.workdir("test_seek_behavior")
490 fileutil.make_dirs(basedir)
491 filename = os.path.join(basedir, "testfile")
492 f = open(filename, "wb")
495 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
496 # files. mode="a" preserves previous contents but does not allow
497 # seeking-to-create-holes. mode="r+" allows both.
498 f = open(filename, "rb+")
502 filelen = os.stat(filename)[stat.ST_SIZE]
503 self.failUnlessEqual(filelen, 100+3)
504 f2 = open(filename, "rb")
505 self.failUnlessEqual(f2.read(5), "start")
508 def test_leases(self):
509 ss = self.create("test_leases")
510 canary = FakeCanary()
514 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
515 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
516 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
517 sharenums, size, canary)
518 self.failUnlessEqual(len(already), 0)
519 self.failUnlessEqual(len(writers), 5)
520 for wb in writers.values():
523 leases = list(ss.get_leases("si0"))
524 self.failUnlessEqual(len(leases), 1)
525 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
527 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
528 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
529 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
530 sharenums, size, canary)
531 for wb in writers.values():
534 # take out a second lease on si1
535 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
538 sharenums, size, canary)
539 self.failUnlessEqual(len(already), 5)
540 self.failUnlessEqual(len(writers), 0)
542 leases = list(ss.get_leases("si1"))
543 self.failUnlessEqual(len(leases), 2)
544 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
546 # and a third lease, using add-lease
547 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
548 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
549 ss.remote_add_lease("si1", rs2a, cs2a)
550 leases = list(ss.get_leases("si1"))
551 self.failUnlessEqual(len(leases), 3)
552 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
554 # add-lease on a missing storage index is silently ignored
555 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
557 # check that si0 is readable
558 readers = ss.remote_get_buckets("si0")
559 self.failUnlessEqual(len(readers), 5)
561 # renew the first lease. Only the proper renew_secret should work
562 ss.remote_renew_lease("si0", rs0)
563 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
564 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
566 # check that si0 is still readable
567 readers = ss.remote_get_buckets("si0")
568 self.failUnlessEqual(len(readers), 5)
571 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
572 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
573 ss.remote_cancel_lease("si0", cs0)
575 # si0 should now be gone
576 readers = ss.remote_get_buckets("si0")
577 self.failUnlessEqual(len(readers), 0)
578 # and the renew should no longer work
579 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
582 # cancel the first lease on si1, leaving the second and third in place
583 ss.remote_cancel_lease("si1", cs1)
584 readers = ss.remote_get_buckets("si1")
585 self.failUnlessEqual(len(readers), 5)
586 # the corresponding renew should no longer work
587 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
589 leases = list(ss.get_leases("si1"))
590 self.failUnlessEqual(len(leases), 2)
591 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
593 ss.remote_renew_lease("si1", rs2)
594 # cancelling the second and third should make it go away
595 ss.remote_cancel_lease("si1", cs2)
596 ss.remote_cancel_lease("si1", cs2a)
597 readers = ss.remote_get_buckets("si1")
598 self.failUnlessEqual(len(readers), 0)
599 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
600 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
601 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
603 leases = list(ss.get_leases("si1"))
604 self.failUnlessEqual(len(leases), 0)
607 # test overlapping uploads
608 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
611 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
612 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
613 sharenums, size, canary)
614 self.failUnlessEqual(len(already), 0)
615 self.failUnlessEqual(len(writers), 5)
616 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
617 sharenums, size, canary)
618 self.failUnlessEqual(len(already2), 0)
619 self.failUnlessEqual(len(writers2), 0)
620 for wb in writers.values():
623 leases = list(ss.get_leases("si3"))
624 self.failUnlessEqual(len(leases), 1)
626 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
627 sharenums, size, canary)
628 self.failUnlessEqual(len(already3), 5)
629 self.failUnlessEqual(len(writers3), 0)
631 leases = list(ss.get_leases("si3"))
632 self.failUnlessEqual(len(leases), 2)
634 def test_readonly(self):
635 workdir = self.workdir("test_readonly")
636 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
637 ss.setServiceParent(self.sparent)
639 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
640 self.failUnlessEqual(already, set())
641 self.failUnlessEqual(writers, {})
643 stats = ss.get_stats()
644 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
645 if "storage_server.disk_avail" in stats:
646 # Some platforms may not have an API to get disk stats.
647 # But if there are stats, readonly_storage means disk_avail=0
648 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
650 def test_discard(self):
651 # discard is really only used for other tests, but we test it anyways
652 workdir = self.workdir("test_discard")
653 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
654 ss.setServiceParent(self.sparent)
656 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
657 self.failUnlessEqual(already, set())
658 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
659 for i,wb in writers.items():
660 wb.remote_write(0, "%25d" % i)
662 # since we discard the data, the shares should be present but sparse.
663 # Since we write with some seeks, the data we read back will be all
665 b = ss.remote_get_buckets("vid")
666 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
667 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
669 def test_advise_corruption(self):
670 workdir = self.workdir("test_advise_corruption")
671 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
672 ss.setServiceParent(self.sparent)
674 si0_s = base32.b2a("si0")
675 ss.remote_advise_corrupt_share("immutable", "si0", 0,
676 "This share smells funny.\n")
677 reportdir = os.path.join(workdir, "corruption-advisories")
678 reports = os.listdir(reportdir)
679 self.failUnlessEqual(len(reports), 1)
680 report_si0 = reports[0]
681 self.failUnless(si0_s in report_si0, report_si0)
682 f = open(os.path.join(reportdir, report_si0), "r")
685 self.failUnless("type: immutable" in report)
686 self.failUnless(("storage_index: %s" % si0_s) in report)
687 self.failUnless("share_number: 0" in report)
688 self.failUnless("This share smells funny." in report)
690 # test the RIBucketWriter version too
691 si1_s = base32.b2a("si1")
692 already,writers = self.allocate(ss, "si1", [1], 75)
693 self.failUnlessEqual(already, set())
694 self.failUnlessEqual(set(writers.keys()), set([1]))
695 writers[1].remote_write(0, "data")
696 writers[1].remote_close()
698 b = ss.remote_get_buckets("si1")
699 self.failUnlessEqual(set(b.keys()), set([1]))
700 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
702 reports = os.listdir(reportdir)
703 self.failUnlessEqual(len(reports), 2)
704 report_si1 = [r for r in reports if si1_s in r][0]
705 f = open(os.path.join(reportdir, report_si1), "r")
708 self.failUnless("type: immutable" in report)
709 self.failUnless(("storage_index: %s" % si1_s) in report)
710 self.failUnless("share_number: 1" in report)
711 self.failUnless("This share tastes like dust." in report)
715 class MutableServer(unittest.TestCase):
718 self.sparent = LoggingServiceParent()
719 self._lease_secret = itertools.count()
721 return self.sparent.stopService()
723 def workdir(self, name):
724 basedir = os.path.join("storage", "MutableServer", name)
727 def create(self, name):
728 workdir = self.workdir(name)
729 ss = StorageServer(workdir, "\x00" * 20)
730 ss.setServiceParent(self.sparent)
733 def test_create(self):
734 self.create("test_create")
736 def write_enabler(self, we_tag):
737 return hashutil.tagged_hash("we_blah", we_tag)
739 def renew_secret(self, tag):
740 return hashutil.tagged_hash("renew_blah", str(tag))
742 def cancel_secret(self, tag):
743 return hashutil.tagged_hash("cancel_blah", str(tag))
745 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
746 write_enabler = self.write_enabler(we_tag)
747 renew_secret = self.renew_secret(lease_tag)
748 cancel_secret = self.cancel_secret(lease_tag)
749 rstaraw = ss.remote_slot_testv_and_readv_and_writev
750 testandwritev = dict( [ (shnum, ([], [], None) )
751 for shnum in sharenums ] )
753 rc = rstaraw(storage_index,
754 (write_enabler, renew_secret, cancel_secret),
757 (did_write, readv_data) = rc
758 self.failUnless(did_write)
759 self.failUnless(isinstance(readv_data, dict))
760 self.failUnlessEqual(len(readv_data), 0)
762 def test_bad_magic(self):
763 ss = self.create("test_bad_magic")
764 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
765 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
770 read = ss.remote_slot_readv
771 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
772 read, "si1", [0], [(0,10)])
773 self.failUnless(" had magic " in str(e), e)
774 self.failUnless(" but we wanted " in str(e), e)
776 def test_container_size(self):
777 ss = self.create("test_container_size")
778 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
780 read = ss.remote_slot_readv
781 rstaraw = ss.remote_slot_testv_and_readv_and_writev
782 secrets = ( self.write_enabler("we1"),
783 self.renew_secret("we1"),
784 self.cancel_secret("we1") )
785 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
786 answer = rstaraw("si1", secrets,
787 {0: ([], [(0,data)], len(data)+12)},
789 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
791 # trying to make the container too large will raise an exception
792 TOOBIG = MutableShareFile.MAX_SIZE + 10
793 self.failUnlessRaises(DataTooLargeError,
794 rstaraw, "si1", secrets,
795 {0: ([], [(0,data)], TOOBIG)},
798 # it should be possible to make the container smaller, although at
799 # the moment this doesn't actually affect the share, unless the
800 # container size is dropped to zero, in which case the share is
802 answer = rstaraw("si1", secrets,
803 {0: ([], [(0,data)], len(data)+8)},
805 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
807 answer = rstaraw("si1", secrets,
808 {0: ([], [(0,data)], 0)},
810 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
812 read_answer = read("si1", [0], [(0,10)])
813 self.failUnlessEqual(read_answer, {})
815 def test_allocate(self):
816 ss = self.create("test_allocate")
817 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
820 read = ss.remote_slot_readv
821 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
823 self.failUnlessEqual(read("si1", [], [(0, 10)]),
824 {0: [""], 1: [""], 2: [""]})
825 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
829 secrets = ( self.write_enabler("we1"),
830 self.renew_secret("we1"),
831 self.cancel_secret("we1") )
832 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
833 write = ss.remote_slot_testv_and_readv_and_writev
834 answer = write("si1", secrets,
835 {0: ([], [(0,data)], None)},
837 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
839 self.failUnlessEqual(read("si1", [0], [(0,20)]),
840 {0: ["00000000001111111111"]})
841 self.failUnlessEqual(read("si1", [0], [(95,10)]),
843 #self.failUnlessEqual(s0.remote_get_length(), 100)
845 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
846 f = self.failUnlessRaises(BadWriteEnablerError,
847 write, "si1", bad_secrets,
849 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
851 # this testv should fail
852 answer = write("si1", secrets,
853 {0: ([(0, 12, "eq", "444444444444"),
854 (20, 5, "eq", "22222"),
861 self.failUnlessEqual(answer, (False,
862 {0: ["000000000011", "22222"],
866 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
869 answer = write("si1", secrets,
870 {0: ([(10, 5, "lt", "11111"),
877 self.failUnlessEqual(answer, (False,
882 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
885 def test_operators(self):
886 # test operators, the data we're comparing is '11111' in all cases.
887 # test both fail+pass, reset data after each one.
888 ss = self.create("test_operators")
890 secrets = ( self.write_enabler("we1"),
891 self.renew_secret("we1"),
892 self.cancel_secret("we1") )
893 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
894 write = ss.remote_slot_testv_and_readv_and_writev
895 read = ss.remote_slot_readv
898 write("si1", secrets,
899 {0: ([], [(0,data)], None)},
905 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
910 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
911 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
912 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
915 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
920 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
921 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
924 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
929 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
930 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
934 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
939 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
940 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
943 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
948 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
949 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
952 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
957 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
962 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
967 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
968 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
971 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
976 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
977 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
981 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
986 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
987 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
990 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
995 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
996 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1000 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1005 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1006 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1009 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1014 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1018 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1023 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1024 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1028 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1033 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1034 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1037 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1042 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1043 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1046 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1051 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1055 # finally, test some operators against empty shares
1056 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1061 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1062 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1065 def test_readv(self):
1066 ss = self.create("test_readv")
1067 secrets = ( self.write_enabler("we1"),
1068 self.renew_secret("we1"),
1069 self.cancel_secret("we1") )
1070 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1071 write = ss.remote_slot_testv_and_readv_and_writev
1072 read = ss.remote_slot_readv
1073 data = [("%d" % i) * 100 for i in range(3)]
1074 rc = write("si1", secrets,
1075 {0: ([], [(0,data[0])], None),
1076 1: ([], [(0,data[1])], None),
1077 2: ([], [(0,data[2])], None),
1079 self.failUnlessEqual(rc, (True, {}))
1081 answer = read("si1", [], [(0, 10)])
1082 self.failUnlessEqual(answer, {0: ["0"*10],
1086 def compare_leases_without_timestamps(self, leases_a, leases_b):
1087 self.failUnlessEqual(len(leases_a), len(leases_b))
1088 for i in range(len(leases_a)):
1091 self.failUnlessEqual(a.owner_num, b.owner_num)
1092 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1093 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1094 self.failUnlessEqual(a.nodeid, b.nodeid)
1096 def compare_leases(self, leases_a, leases_b):
1097 self.failUnlessEqual(len(leases_a), len(leases_b))
1098 for i in range(len(leases_a)):
1101 self.failUnlessEqual(a.owner_num, b.owner_num)
1102 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1103 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1104 self.failUnlessEqual(a.nodeid, b.nodeid)
1105 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1107 def test_leases(self):
1108 ss = self.create("test_leases")
1110 return ( self.write_enabler("we1"),
1111 self.renew_secret("we1-%d" % n),
1112 self.cancel_secret("we1-%d" % n) )
1113 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1114 write = ss.remote_slot_testv_and_readv_and_writev
1115 read = ss.remote_slot_readv
1116 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1117 self.failUnlessEqual(rc, (True, {}))
1119 # create a random non-numeric file in the bucket directory, to
1120 # exercise the code that's supposed to ignore those.
1121 bucket_dir = os.path.join(self.workdir("test_leases"),
1122 "shares", storage_index_to_dir("si1"))
1123 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1124 f.write("you ought to be ignoring me\n")
1127 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1128 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1130 # add-lease on a missing storage index is silently ignored
1131 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1133 # re-allocate the slots and use the same secrets, that should update
1135 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1136 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1139 ss.remote_renew_lease("si1", secrets(0)[1])
1140 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1142 # now allocate them with a bunch of different secrets, to trigger the
1143 # extended lease code. Use add_lease for one of them.
1144 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1145 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1146 secrets2 = secrets(2)
1147 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1148 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1149 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1150 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1151 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1153 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1155 # cancel one of them
1156 ss.remote_cancel_lease("si1", secrets(5)[2])
1157 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1159 all_leases = list(s0.get_leases())
1160 # and write enough data to expand the container, forcing the server
1161 # to move the leases
1162 write("si1", secrets(0),
1163 {0: ([], [(0,data)], 200), },
1166 # read back the leases, make sure they're still intact.
1167 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1169 ss.remote_renew_lease("si1", secrets(0)[1])
1170 ss.remote_renew_lease("si1", secrets(1)[1])
1171 ss.remote_renew_lease("si1", secrets(2)[1])
1172 ss.remote_renew_lease("si1", secrets(3)[1])
1173 ss.remote_renew_lease("si1", secrets(4)[1])
1174 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1175 # get a new copy of the leases, with the current timestamps. Reading
1176 # data and failing to renew/cancel leases should leave the timestamps
1178 all_leases = list(s0.get_leases())
1179 # renewing with a bogus token should prompt an error message
1181 # examine the exception thus raised, make sure the old nodeid is
1182 # present, to provide for share migration
1183 e = self.failUnlessRaises(IndexError,
1184 ss.remote_renew_lease, "si1",
1187 self.failUnless("Unable to renew non-existent lease" in e_s)
1188 self.failUnless("I have leases accepted by nodeids:" in e_s)
1189 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1191 # same for cancelling
1192 self.failUnlessRaises(IndexError,
1193 ss.remote_cancel_lease, "si1",
1195 self.compare_leases(all_leases, list(s0.get_leases()))
1197 # reading shares should not modify the timestamp
1198 read("si1", [], [(0,200)])
1199 self.compare_leases(all_leases, list(s0.get_leases()))
1201 write("si1", secrets(0),
1202 {0: ([], [(200, "make me bigger")], None)}, [])
1203 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1205 write("si1", secrets(0),
1206 {0: ([], [(500, "make me really bigger")], None)}, [])
1207 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1209 # now cancel them all
1210 ss.remote_cancel_lease("si1", secrets(0)[2])
1211 ss.remote_cancel_lease("si1", secrets(1)[2])
1212 ss.remote_cancel_lease("si1", secrets(2)[2])
1213 ss.remote_cancel_lease("si1", secrets(3)[2])
1215 # the slot should still be there
1216 remaining_shares = read("si1", [], [(0,10)])
1217 self.failUnlessEqual(len(remaining_shares), 1)
1218 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1220 # cancelling a non-existent lease should raise an IndexError
1221 self.failUnlessRaises(IndexError,
1222 ss.remote_cancel_lease, "si1", "nonsecret")
1224 # and the slot should still be there
1225 remaining_shares = read("si1", [], [(0,10)])
1226 self.failUnlessEqual(len(remaining_shares), 1)
1227 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1229 ss.remote_cancel_lease("si1", secrets(4)[2])
1230 # now the slot should be gone
1231 no_shares = read("si1", [], [(0,10)])
1232 self.failUnlessEqual(no_shares, {})
1234 # cancelling a lease on a non-existent share should raise an IndexError
1235 self.failUnlessRaises(IndexError,
1236 ss.remote_cancel_lease, "si2", "nonsecret")
1238 def test_remove(self):
1239 ss = self.create("test_remove")
1240 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1242 readv = ss.remote_slot_readv
1243 writev = ss.remote_slot_testv_and_readv_and_writev
1244 secrets = ( self.write_enabler("we1"),
1245 self.renew_secret("we1"),
1246 self.cancel_secret("we1") )
1247 # delete sh0 by setting its size to zero
1248 answer = writev("si1", secrets,
1251 # the answer should mention all the shares that existed before the
1253 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1254 # but a new read should show only sh1 and sh2
1255 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1258 # delete sh1 by setting its size to zero
1259 answer = writev("si1", secrets,
1262 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1263 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1266 # delete sh2 by setting its size to zero
1267 answer = writev("si1", secrets,
1270 self.failUnlessEqual(answer, (True, {2:[]}) )
1271 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1273 # and the bucket directory should now be gone
1274 si = base32.b2a("si1")
1275 # note: this is a detail of the storage server implementation, and
1276 # may change in the future
1278 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1279 bucketdir = os.path.join(prefixdir, si)
1280 self.failUnless(os.path.exists(prefixdir))
1281 self.failIf(os.path.exists(bucketdir))
1283 class Stats(unittest.TestCase):
1286 self.sparent = LoggingServiceParent()
1287 self._lease_secret = itertools.count()
1289 return self.sparent.stopService()
1291 def workdir(self, name):
1292 basedir = os.path.join("storage", "Server", name)
1295 def create(self, name):
1296 workdir = self.workdir(name)
1297 ss = StorageServer(workdir, "\x00" * 20)
1298 ss.setServiceParent(self.sparent)
1301 def test_latencies(self):
1302 ss = self.create("test_latencies")
1303 for i in range(10000):
1304 ss.add_latency("allocate", 1.0 * i)
1305 for i in range(1000):
1306 ss.add_latency("renew", 1.0 * i)
1308 ss.add_latency("cancel", 2.0 * i)
1309 ss.add_latency("get", 5.0)
1311 output = ss.get_latencies()
1313 self.failUnlessEqual(sorted(output.keys()),
1314 sorted(["allocate", "renew", "cancel", "get"]))
1315 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1316 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1317 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1318 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1319 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1320 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1321 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1322 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1323 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1325 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1326 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1327 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1328 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1329 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1330 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1331 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1332 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1333 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1335 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1336 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1337 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1338 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1339 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1340 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1341 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1342 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1343 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1345 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1346 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1347 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1348 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1349 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1350 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1351 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1352 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1353 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1356 s = re.sub(r'<[^>]*>', ' ', s)
1357 s = re.sub(r'\s+', ' ', s)
1360 class MyBucketCountingCrawler(BucketCountingCrawler):
1361 def finished_prefix(self, cycle, prefix):
1362 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1364 d = self.hook_ds.pop(0)
1367 class MyStorageServer(StorageServer):
1368 def add_bucket_counter(self):
1369 statefile = os.path.join(self.storedir, "bucket_counter.state")
1370 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1371 self.bucket_counter.setServiceParent(self)
1373 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1376 self.s = service.MultiService()
1377 self.s.startService()
1379 return self.s.stopService()
1381 def test_bucket_counter(self):
1382 basedir = "storage/BucketCounter/bucket_counter"
1383 fileutil.make_dirs(basedir)
1384 ss = StorageServer(basedir, "\x00" * 20)
1385 # to make sure we capture the bucket-counting-crawler in the middle
1386 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1387 # also make it start sooner than usual.
1388 ss.bucket_counter.slow_start = 0
1389 orig_cpu_slice = ss.bucket_counter.cpu_slice
1390 ss.bucket_counter.cpu_slice = 0
1391 ss.setServiceParent(self.s)
1393 w = StorageStatus(ss)
1395 # this sample is before the crawler has started doing anything
1396 html = w.renderSynchronously()
1397 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1398 s = remove_tags(html)
1399 self.failUnless("Accepting new shares: Yes" in s, s)
1400 self.failUnless("Reserved space: - 0 B (0)" in s, s)
1401 self.failUnless("Total buckets: Not computed yet" in s, s)
1402 self.failUnless("Next crawl in" in s, s)
1404 # give the bucket-counting-crawler one tick to get started. The
1405 # cpu_slice=0 will force it to yield right after it processes the
1408 d = fireEventually()
1409 def _check(ignored):
1410 # are we really right after the first prefix?
1411 state = ss.bucket_counter.get_state()
1412 self.failUnlessEqual(state["last-complete-prefix"],
1413 ss.bucket_counter.prefixes[0])
1414 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1415 html = w.renderSynchronously()
1416 s = remove_tags(html)
1417 self.failUnless(" Current crawl " in s, s)
1418 self.failUnless(" (next work in " in s, s)
1419 d.addCallback(_check)
1421 # now give it enough time to complete a full cycle
1423 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1424 d.addCallback(lambda ignored: self.poll(_watch))
1425 def _check2(ignored):
1426 ss.bucket_counter.cpu_slice = orig_cpu_slice
1427 html = w.renderSynchronously()
1428 s = remove_tags(html)
1429 self.failUnless("Total buckets: 0 (the number of" in s, s)
1430 self.failUnless("Next crawl in 59 minutes" in s, s)
1431 d.addCallback(_check2)
1434 def test_bucket_counter_cleanup(self):
1435 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1436 fileutil.make_dirs(basedir)
1437 ss = StorageServer(basedir, "\x00" * 20)
1438 # to make sure we capture the bucket-counting-crawler in the middle
1439 # of a cycle, we reach in and reduce its maximum slice time to 0.
1440 ss.bucket_counter.slow_start = 0
1441 orig_cpu_slice = ss.bucket_counter.cpu_slice
1442 ss.bucket_counter.cpu_slice = 0
1443 ss.setServiceParent(self.s)
1445 d = fireEventually()
1447 def _after_first_prefix(ignored):
1448 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1449 # now sneak in and mess with its state, to make sure it cleans up
1450 # properly at the end of the cycle
1451 state = ss.bucket_counter.state
1452 self.failUnlessEqual(state["last-complete-prefix"],
1453 ss.bucket_counter.prefixes[0])
1454 state["bucket-counts"][-12] = {}
1455 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1456 ss.bucket_counter.save_state()
1457 d.addCallback(_after_first_prefix)
1459 # now give it enough time to complete a cycle
1461 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1462 d.addCallback(lambda ignored: self.poll(_watch))
1463 def _check2(ignored):
1464 ss.bucket_counter.cpu_slice = orig_cpu_slice
1465 s = ss.bucket_counter.get_state()
1466 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1467 self.failIf("bogusprefix!" in s["storage-index-samples"],
1468 s["storage-index-samples"].keys())
1469 d.addCallback(_check2)
1472 def test_bucket_counter_eta(self):
1473 basedir = "storage/BucketCounter/bucket_counter_eta"
1474 fileutil.make_dirs(basedir)
1475 ss = MyStorageServer(basedir, "\x00" * 20)
1476 ss.bucket_counter.slow_start = 0
1477 # these will be fired inside finished_prefix()
1478 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1479 w = StorageStatus(ss)
1481 d = defer.Deferred()
1483 def _check_1(ignored):
1484 # no ETA is available yet
1485 html = w.renderSynchronously()
1486 s = remove_tags(html)
1487 self.failUnlessIn("complete (next work", s)
1489 def _check_2(ignored):
1490 # one prefix has finished, so an ETA based upon that elapsed time
1491 # should be available.
1492 html = w.renderSynchronously()
1493 s = remove_tags(html)
1494 self.failUnlessIn("complete (ETA ", s)
1496 def _check_3(ignored):
1497 # two prefixes have finished
1498 html = w.renderSynchronously()
1499 s = remove_tags(html)
1500 self.failUnlessIn("complete (ETA ", s)
1503 hooks[0].addCallback(_check_1).addErrback(d.errback)
1504 hooks[1].addCallback(_check_2).addErrback(d.errback)
1505 hooks[2].addCallback(_check_3).addErrback(d.errback)
1507 ss.setServiceParent(self.s)
1510 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1511 stop_after_first_bucket = False
1512 def process_bucket(self, *args, **kwargs):
1513 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1514 if self.stop_after_first_bucket:
1515 self.stop_after_first_bucket = False
1516 self.cpu_slice = -1.0
1517 def yielding(self, sleep_time):
1518 if not self.stop_after_first_bucket:
1519 self.cpu_slice = 500
1521 class BrokenStatResults:
1523 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1526 bsr = BrokenStatResults()
1527 for attrname in dir(s):
1528 if attrname.startswith("_"):
1530 if attrname == "st_blocks":
1532 setattr(bsr, attrname, getattr(s, attrname))
1535 class InstrumentedStorageServer(StorageServer):
1536 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1537 class No_ST_BLOCKS_StorageServer(StorageServer):
1538 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1540 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1543 self.s = service.MultiService()
1544 self.s.startService()
1546 return self.s.stopService()
1548 def make_shares(self, ss):
1550 return (si, hashutil.tagged_hash("renew", si),
1551 hashutil.tagged_hash("cancel", si))
1552 def make_mutable(si):
1553 return (si, hashutil.tagged_hash("renew", si),
1554 hashutil.tagged_hash("cancel", si),
1555 hashutil.tagged_hash("write-enabler", si))
1556 def make_extra_lease(si, num):
1557 return (hashutil.tagged_hash("renew-%d" % num, si),
1558 hashutil.tagged_hash("cancel-%d" % num, si))
1560 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1561 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1562 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1563 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1564 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1565 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1567 canary = FakeCanary()
1568 # note: 'tahoe debug dump-share' will not handle this file, since the
1569 # inner contents are not a valid CHK share
1570 data = "\xff" * 1000
1572 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1574 w[0].remote_write(0, data)
1577 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1579 w[0].remote_write(0, data)
1581 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1583 writev = ss.remote_slot_testv_and_readv_and_writev
1584 writev(mutable_si_2, (we2, rs2, cs2),
1585 {0: ([], [(0,data)], len(data))}, [])
1586 writev(mutable_si_3, (we3, rs3, cs3),
1587 {0: ([], [(0,data)], len(data))}, [])
1588 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1590 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1591 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1592 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1594 def test_basic(self):
1595 basedir = "storage/LeaseCrawler/basic"
1596 fileutil.make_dirs(basedir)
1597 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1598 # make it start sooner than usual.
1599 lc = ss.lease_checker
1602 lc.stop_after_first_bucket = True
1603 webstatus = StorageStatus(ss)
1605 # create a few shares, with some leases on them
1606 self.make_shares(ss)
1607 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1609 # add a non-sharefile to exercise another code path
1610 fn = os.path.join(ss.sharedir,
1611 storage_index_to_dir(immutable_si_0),
1614 f.write("I am not a share.\n")
1617 # this is before the crawl has started, so we're not in a cycle yet
1618 initial_state = lc.get_state()
1619 self.failIf(lc.get_progress()["cycle-in-progress"])
1620 self.failIf("cycle-to-date" in initial_state)
1621 self.failIf("estimated-remaining-cycle" in initial_state)
1622 self.failIf("estimated-current-cycle" in initial_state)
1623 self.failUnless("history" in initial_state)
1624 self.failUnlessEqual(initial_state["history"], {})
1626 ss.setServiceParent(self.s)
1630 d = fireEventually()
1632 # now examine the state right after the first bucket has been
1634 def _after_first_bucket(ignored):
1635 initial_state = lc.get_state()
1636 self.failUnless("cycle-to-date" in initial_state)
1637 self.failUnless("estimated-remaining-cycle" in initial_state)
1638 self.failUnless("estimated-current-cycle" in initial_state)
1639 self.failUnless("history" in initial_state)
1640 self.failUnlessEqual(initial_state["history"], {})
1642 so_far = initial_state["cycle-to-date"]
1643 self.failUnlessEqual(so_far["expiration-enabled"], False)
1644 self.failUnless("configured-expiration-mode" in so_far)
1645 self.failUnless("lease-age-histogram" in so_far)
1646 lah = so_far["lease-age-histogram"]
1647 self.failUnlessEqual(type(lah), list)
1648 self.failUnlessEqual(len(lah), 1)
1649 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1650 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1651 self.failUnlessEqual(so_far["corrupt-shares"], [])
1652 sr1 = so_far["space-recovered"]
1653 self.failUnlessEqual(sr1["examined-buckets"], 1)
1654 self.failUnlessEqual(sr1["examined-shares"], 1)
1655 self.failUnlessEqual(sr1["actual-shares"], 0)
1656 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1657 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1658 left = initial_state["estimated-remaining-cycle"]
1659 sr2 = left["space-recovered"]
1660 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1661 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1662 self.failIfEqual(sr2["actual-shares"], None)
1663 self.failIfEqual(sr2["configured-diskbytes"], None)
1664 self.failIfEqual(sr2["original-sharebytes"], None)
1665 d.addCallback(_after_first_bucket)
1666 d.addCallback(lambda ign: self.render1(webstatus))
1667 def _check_html_in_cycle(html):
1668 s = remove_tags(html)
1669 self.failUnlessIn("So far, this cycle has examined "
1670 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1671 self.failUnlessIn("and has recovered: "
1672 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1673 "0 B (0 B / 0 B)", s)
1674 self.failUnlessIn("If expiration were enabled, "
1675 "we would have recovered: "
1676 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1677 " 0 B (0 B / 0 B) by now", s)
1678 self.failUnlessIn("and the remainder of this cycle "
1679 "would probably recover: "
1680 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1681 " 0 B (0 B / 0 B)", s)
1682 self.failUnlessIn("and the whole cycle would probably recover: "
1683 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1684 " 0 B (0 B / 0 B)", s)
1685 self.failUnlessIn("if we were strictly using each lease's default "
1686 "31-day lease lifetime", s)
1687 self.failUnlessIn("this cycle would be expected to recover: ", s)
1688 d.addCallback(_check_html_in_cycle)
1690 # wait for the crawler to finish the first cycle. Nothing should have
1693 return bool(lc.get_state()["last-cycle-finished"] is not None)
1694 d.addCallback(lambda ign: self.poll(_wait))
1696 def _after_first_cycle(ignored):
1698 self.failIf("cycle-to-date" in s)
1699 self.failIf("estimated-remaining-cycle" in s)
1700 self.failIf("estimated-current-cycle" in s)
1701 last = s["history"][0]
1702 self.failUnless("cycle-start-finish-times" in last)
1703 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1704 self.failUnlessEqual(last["expiration-enabled"], False)
1705 self.failUnless("configured-expiration-mode" in last)
1707 self.failUnless("lease-age-histogram" in last)
1708 lah = last["lease-age-histogram"]
1709 self.failUnlessEqual(type(lah), list)
1710 self.failUnlessEqual(len(lah), 1)
1711 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1713 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1714 self.failUnlessEqual(last["corrupt-shares"], [])
1716 rec = last["space-recovered"]
1717 self.failUnlessEqual(rec["examined-buckets"], 4)
1718 self.failUnlessEqual(rec["examined-shares"], 4)
1719 self.failUnlessEqual(rec["actual-buckets"], 0)
1720 self.failUnlessEqual(rec["original-buckets"], 0)
1721 self.failUnlessEqual(rec["configured-buckets"], 0)
1722 self.failUnlessEqual(rec["actual-shares"], 0)
1723 self.failUnlessEqual(rec["original-shares"], 0)
1724 self.failUnlessEqual(rec["configured-shares"], 0)
1725 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1726 self.failUnlessEqual(rec["original-diskbytes"], 0)
1727 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1728 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1729 self.failUnlessEqual(rec["original-sharebytes"], 0)
1730 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1732 def _get_sharefile(si):
1733 return list(ss._iter_share_files(si))[0]
1734 def count_leases(si):
1735 return len(list(_get_sharefile(si).get_leases()))
1736 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1737 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1738 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1739 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1740 d.addCallback(_after_first_cycle)
1741 d.addCallback(lambda ign: self.render1(webstatus))
1742 def _check_html(html):
1743 s = remove_tags(html)
1744 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1745 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
1746 "but expiration was not enabled", s)
1747 d.addCallback(_check_html)
1748 d.addCallback(lambda ign: self.render_json(webstatus))
1749 def _check_json(json):
1750 data = simplejson.loads(json)
1751 self.failUnless("lease-checker" in data)
1752 self.failUnless("lease-checker-progress" in data)
1753 d.addCallback(_check_json)
1756 def backdate_lease(self, sf, renew_secret, new_expire_time):
1757 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1758 # "renew" a lease with a new_expire_time that is older than what the
1759 # current lease has), so we have to reach inside it.
1760 for i,lease in enumerate(sf.get_leases()):
1761 if lease.renew_secret == renew_secret:
1762 lease.expiration_time = new_expire_time
1763 f = open(sf.home, 'rb+')
1764 sf._write_lease_record(f, i, lease)
1767 raise IndexError("unable to renew non-existent lease")
1769 def test_expire_age(self):
1770 basedir = "storage/LeaseCrawler/expire_age"
1771 fileutil.make_dirs(basedir)
1772 # setting expiration_time to 2000 means that any lease which is more
1773 # than 2000s old will be expired.
1774 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1775 expiration_enabled=True,
1776 expiration_mode="age",
1777 expiration_override_lease_duration=2000)
1778 # make it start sooner than usual.
1779 lc = ss.lease_checker
1781 lc.stop_after_first_bucket = True
1782 webstatus = StorageStatus(ss)
1784 # create a few shares, with some leases on them
1785 self.make_shares(ss)
1786 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1788 def count_shares(si):
1789 return len(list(ss._iter_share_files(si)))
1790 def _get_sharefile(si):
1791 return list(ss._iter_share_files(si))[0]
1792 def count_leases(si):
1793 return len(list(_get_sharefile(si).get_leases()))
1795 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1796 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1797 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1798 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1799 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1800 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1801 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1802 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1804 # artificially crank back the expiration time on the first lease of
1805 # each share, to make it look like it expired already (age=1000s).
1806 # Some shares have an extra lease which is set to expire at the
1807 # default time in 31 days from now (age=31days). We then run the
1808 # crawler, which will expire the first lease, making some shares get
1809 # deleted and others stay alive (with one remaining lease)
1812 sf0 = _get_sharefile(immutable_si_0)
1813 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1814 sf0_size = os.stat(sf0.home).st_size
1816 # immutable_si_1 gets an extra lease
1817 sf1 = _get_sharefile(immutable_si_1)
1818 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1820 sf2 = _get_sharefile(mutable_si_2)
1821 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1822 sf2_size = os.stat(sf2.home).st_size
1824 # mutable_si_3 gets an extra lease
1825 sf3 = _get_sharefile(mutable_si_3)
1826 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1828 ss.setServiceParent(self.s)
1830 d = fireEventually()
1831 # examine the state right after the first bucket has been processed
1832 def _after_first_bucket(ignored):
1833 p = lc.get_progress()
1834 self.failUnless(p["cycle-in-progress"])
1835 d.addCallback(_after_first_bucket)
1836 d.addCallback(lambda ign: self.render1(webstatus))
1837 def _check_html_in_cycle(html):
1838 s = remove_tags(html)
1839 # the first bucket encountered gets deleted, and its prefix
1840 # happens to be about 1/5th of the way through the ring, so the
1841 # predictor thinks we'll have 5 shares and that we'll delete them
1842 # all. This part of the test depends upon the SIs landing right
1843 # where they do now.
1844 self.failUnlessIn("The remainder of this cycle is expected to "
1845 "recover: 4 shares, 4 buckets", s)
1846 self.failUnlessIn("The whole cycle is expected to examine "
1847 "5 shares in 5 buckets and to recover: "
1848 "5 shares, 5 buckets", s)
1849 d.addCallback(_check_html_in_cycle)
1851 # wait for the crawler to finish the first cycle. Two shares should
1854 return bool(lc.get_state()["last-cycle-finished"] is not None)
1855 d.addCallback(lambda ign: self.poll(_wait))
1857 def _after_first_cycle(ignored):
1858 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1859 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1860 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1861 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1862 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1863 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1866 last = s["history"][0]
1868 self.failUnlessEqual(last["expiration-enabled"], True)
1869 self.failUnlessEqual(last["configured-expiration-mode"],
1870 ("age", 2000, None, ("mutable", "immutable")))
1871 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1873 rec = last["space-recovered"]
1874 self.failUnlessEqual(rec["examined-buckets"], 4)
1875 self.failUnlessEqual(rec["examined-shares"], 4)
1876 self.failUnlessEqual(rec["actual-buckets"], 2)
1877 self.failUnlessEqual(rec["original-buckets"], 2)
1878 self.failUnlessEqual(rec["configured-buckets"], 2)
1879 self.failUnlessEqual(rec["actual-shares"], 2)
1880 self.failUnlessEqual(rec["original-shares"], 2)
1881 self.failUnlessEqual(rec["configured-shares"], 2)
1882 size = sf0_size + sf2_size
1883 self.failUnlessEqual(rec["actual-sharebytes"], size)
1884 self.failUnlessEqual(rec["original-sharebytes"], size)
1885 self.failUnlessEqual(rec["configured-sharebytes"], size)
1886 # different platforms have different notions of "blocks used by
1887 # this file", so merely assert that it's a number
1888 self.failUnless(rec["actual-diskbytes"] >= 0,
1889 rec["actual-diskbytes"])
1890 self.failUnless(rec["original-diskbytes"] >= 0,
1891 rec["original-diskbytes"])
1892 self.failUnless(rec["configured-diskbytes"] >= 0,
1893 rec["configured-diskbytes"])
1894 d.addCallback(_after_first_cycle)
1895 d.addCallback(lambda ign: self.render1(webstatus))
1896 def _check_html(html):
1897 s = remove_tags(html)
1898 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1899 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1900 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1901 d.addCallback(_check_html)
1904 def test_expire_cutoff_date(self):
1905 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1906 fileutil.make_dirs(basedir)
1907 # setting cutoff-date to 2000 seconds ago means that any lease which
1908 # is more than 2000s old will be expired.
1910 then = int(now - 2000)
1911 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1912 expiration_enabled=True,
1913 expiration_mode="cutoff-date",
1914 expiration_cutoff_date=then)
1915 # make it start sooner than usual.
1916 lc = ss.lease_checker
1918 lc.stop_after_first_bucket = True
1919 webstatus = StorageStatus(ss)
1921 # create a few shares, with some leases on them
1922 self.make_shares(ss)
1923 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1925 def count_shares(si):
1926 return len(list(ss._iter_share_files(si)))
1927 def _get_sharefile(si):
1928 return list(ss._iter_share_files(si))[0]
1929 def count_leases(si):
1930 return len(list(_get_sharefile(si).get_leases()))
1932 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1933 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1934 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1935 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1936 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1937 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1938 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1939 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1941 # artificially crank back the expiration time on the first lease of
1942 # each share, to make it look like was renewed 3000s ago. To achieve
1943 # this, we need to set the expiration time to now-3000+31days. This
1944 # will change when the lease format is improved to contain both
1945 # create/renew time and duration.
1946 new_expiration_time = now - 3000 + 31*24*60*60
1948 # Some shares have an extra lease which is set to expire at the
1949 # default time in 31 days from now (age=31days). We then run the
1950 # crawler, which will expire the first lease, making some shares get
1951 # deleted and others stay alive (with one remaining lease)
1953 sf0 = _get_sharefile(immutable_si_0)
1954 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1955 sf0_size = os.stat(sf0.home).st_size
1957 # immutable_si_1 gets an extra lease
1958 sf1 = _get_sharefile(immutable_si_1)
1959 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1961 sf2 = _get_sharefile(mutable_si_2)
1962 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1963 sf2_size = os.stat(sf2.home).st_size
1965 # mutable_si_3 gets an extra lease
1966 sf3 = _get_sharefile(mutable_si_3)
1967 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1969 ss.setServiceParent(self.s)
1971 d = fireEventually()
1972 # examine the state right after the first bucket has been processed
1973 def _after_first_bucket(ignored):
1974 p = lc.get_progress()
1975 self.failUnless(p["cycle-in-progress"])
1976 d.addCallback(_after_first_bucket)
1977 d.addCallback(lambda ign: self.render1(webstatus))
1978 def _check_html_in_cycle(html):
1979 s = remove_tags(html)
1980 # the first bucket encountered gets deleted, and its prefix
1981 # happens to be about 1/5th of the way through the ring, so the
1982 # predictor thinks we'll have 5 shares and that we'll delete them
1983 # all. This part of the test depends upon the SIs landing right
1984 # where they do now.
1985 self.failUnlessIn("The remainder of this cycle is expected to "
1986 "recover: 4 shares, 4 buckets", s)
1987 self.failUnlessIn("The whole cycle is expected to examine "
1988 "5 shares in 5 buckets and to recover: "
1989 "5 shares, 5 buckets", s)
1990 d.addCallback(_check_html_in_cycle)
1992 # wait for the crawler to finish the first cycle. Two shares should
1995 return bool(lc.get_state()["last-cycle-finished"] is not None)
1996 d.addCallback(lambda ign: self.poll(_wait))
1998 def _after_first_cycle(ignored):
1999 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2000 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2001 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2002 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2003 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2004 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2007 last = s["history"][0]
2009 self.failUnlessEqual(last["expiration-enabled"], True)
2010 self.failUnlessEqual(last["configured-expiration-mode"],
2011 ("cutoff-date", None, then,
2012 ("mutable", "immutable")))
2013 self.failUnlessEqual(last["leases-per-share-histogram"],
2016 rec = last["space-recovered"]
2017 self.failUnlessEqual(rec["examined-buckets"], 4)
2018 self.failUnlessEqual(rec["examined-shares"], 4)
2019 self.failUnlessEqual(rec["actual-buckets"], 2)
2020 self.failUnlessEqual(rec["original-buckets"], 0)
2021 self.failUnlessEqual(rec["configured-buckets"], 2)
2022 self.failUnlessEqual(rec["actual-shares"], 2)
2023 self.failUnlessEqual(rec["original-shares"], 0)
2024 self.failUnlessEqual(rec["configured-shares"], 2)
2025 size = sf0_size + sf2_size
2026 self.failUnlessEqual(rec["actual-sharebytes"], size)
2027 self.failUnlessEqual(rec["original-sharebytes"], 0)
2028 self.failUnlessEqual(rec["configured-sharebytes"], size)
2029 # different platforms have different notions of "blocks used by
2030 # this file", so merely assert that it's a number
2031 self.failUnless(rec["actual-diskbytes"] >= 0,
2032 rec["actual-diskbytes"])
2033 self.failUnless(rec["original-diskbytes"] >= 0,
2034 rec["original-diskbytes"])
2035 self.failUnless(rec["configured-diskbytes"] >= 0,
2036 rec["configured-diskbytes"])
2037 d.addCallback(_after_first_cycle)
2038 d.addCallback(lambda ign: self.render1(webstatus))
2039 def _check_html(html):
2040 s = remove_tags(html)
2041 self.failUnlessIn("Expiration Enabled:"
2042 " expired leases will be removed", s)
2043 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2044 substr = "Leases created or last renewed before %s will be considered expired." % date
2045 self.failUnlessIn(substr, s)
2046 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2047 d.addCallback(_check_html)
2050 def test_only_immutable(self):
2051 basedir = "storage/LeaseCrawler/only_immutable"
2052 fileutil.make_dirs(basedir)
2054 then = int(now - 2000)
2055 ss = StorageServer(basedir, "\x00" * 20,
2056 expiration_enabled=True,
2057 expiration_mode="cutoff-date",
2058 expiration_cutoff_date=then,
2059 expiration_sharetypes=("immutable",))
2060 lc = ss.lease_checker
2062 webstatus = StorageStatus(ss)
2064 self.make_shares(ss)
2065 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2066 # set all leases to be expirable
2067 new_expiration_time = now - 3000 + 31*24*60*60
2069 def count_shares(si):
2070 return len(list(ss._iter_share_files(si)))
2071 def _get_sharefile(si):
2072 return list(ss._iter_share_files(si))[0]
2073 def count_leases(si):
2074 return len(list(_get_sharefile(si).get_leases()))
2076 sf0 = _get_sharefile(immutable_si_0)
2077 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2078 sf1 = _get_sharefile(immutable_si_1)
2079 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2080 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2081 sf2 = _get_sharefile(mutable_si_2)
2082 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2083 sf3 = _get_sharefile(mutable_si_3)
2084 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2085 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2087 ss.setServiceParent(self.s)
2089 return bool(lc.get_state()["last-cycle-finished"] is not None)
2090 d = self.poll(_wait)
2092 def _after_first_cycle(ignored):
2093 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2094 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2095 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2096 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2097 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2098 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2099 d.addCallback(_after_first_cycle)
2100 d.addCallback(lambda ign: self.render1(webstatus))
2101 def _check_html(html):
2102 s = remove_tags(html)
2103 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2104 d.addCallback(_check_html)
2107 def test_only_mutable(self):
2108 basedir = "storage/LeaseCrawler/only_mutable"
2109 fileutil.make_dirs(basedir)
2111 then = int(now - 2000)
2112 ss = StorageServer(basedir, "\x00" * 20,
2113 expiration_enabled=True,
2114 expiration_mode="cutoff-date",
2115 expiration_cutoff_date=then,
2116 expiration_sharetypes=("mutable",))
2117 lc = ss.lease_checker
2119 webstatus = StorageStatus(ss)
2121 self.make_shares(ss)
2122 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2123 # set all leases to be expirable
2124 new_expiration_time = now - 3000 + 31*24*60*60
2126 def count_shares(si):
2127 return len(list(ss._iter_share_files(si)))
2128 def _get_sharefile(si):
2129 return list(ss._iter_share_files(si))[0]
2130 def count_leases(si):
2131 return len(list(_get_sharefile(si).get_leases()))
2133 sf0 = _get_sharefile(immutable_si_0)
2134 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2135 sf1 = _get_sharefile(immutable_si_1)
2136 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2137 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2138 sf2 = _get_sharefile(mutable_si_2)
2139 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2140 sf3 = _get_sharefile(mutable_si_3)
2141 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2142 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2144 ss.setServiceParent(self.s)
2146 return bool(lc.get_state()["last-cycle-finished"] is not None)
2147 d = self.poll(_wait)
2149 def _after_first_cycle(ignored):
2150 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2151 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2152 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2153 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2154 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2155 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2156 d.addCallback(_after_first_cycle)
2157 d.addCallback(lambda ign: self.render1(webstatus))
2158 def _check_html(html):
2159 s = remove_tags(html)
2160 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2161 d.addCallback(_check_html)
2164 def test_bad_mode(self):
2165 basedir = "storage/LeaseCrawler/bad_mode"
2166 fileutil.make_dirs(basedir)
2167 e = self.failUnlessRaises(ValueError,
2168 StorageServer, basedir, "\x00" * 20,
2169 expiration_mode="bogus")
2170 self.failUnless("GC mode 'bogus' must be 'age' or 'cutoff-date'" in str(e), str(e))
2172 def test_parse_duration(self):
2176 p = time_format.parse_duration
2177 self.failUnlessEqual(p("7days"), 7*DAY)
2178 self.failUnlessEqual(p("31day"), 31*DAY)
2179 self.failUnlessEqual(p("60 days"), 60*DAY)
2180 self.failUnlessEqual(p("2mo"), 2*MONTH)
2181 self.failUnlessEqual(p("3 month"), 3*MONTH)
2182 self.failUnlessEqual(p("2years"), 2*YEAR)
2183 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2184 self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2187 def test_parse_date(self):
2188 p = time_format.parse_date
2189 self.failUnless(isinstance(p("2009-03-18"), int))
2190 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2192 def test_limited_history(self):
2193 basedir = "storage/LeaseCrawler/limited_history"
2194 fileutil.make_dirs(basedir)
2195 ss = StorageServer(basedir, "\x00" * 20)
2196 # make it start sooner than usual.
2197 lc = ss.lease_checker
2201 # create a few shares, with some leases on them
2202 self.make_shares(ss)
2204 ss.setServiceParent(self.s)
2206 def _wait_until_15_cycles_done():
2207 last = lc.state["last-cycle-finished"]
2208 if last is not None and last >= 15:
2213 d = self.poll(_wait_until_15_cycles_done)
2215 def _check(ignored):
2218 self.failUnlessEqual(len(h), 10)
2219 self.failUnlessEqual(max(h.keys()), 15)
2220 self.failUnlessEqual(min(h.keys()), 6)
2221 d.addCallback(_check)
2224 def test_unpredictable_future(self):
2225 basedir = "storage/LeaseCrawler/unpredictable_future"
2226 fileutil.make_dirs(basedir)
2227 ss = StorageServer(basedir, "\x00" * 20)
2228 # make it start sooner than usual.
2229 lc = ss.lease_checker
2231 lc.cpu_slice = -1.0 # stop quickly
2233 self.make_shares(ss)
2235 ss.setServiceParent(self.s)
2237 d = fireEventually()
2238 def _check(ignored):
2239 # this should fire after the first bucket is complete, but before
2240 # the first prefix is complete, so the progress-measurer won't
2241 # think we've gotten far enough to raise our percent-complete
2242 # above 0%, triggering the cannot-predict-the-future code in
2243 # expirer.py . This will have to change if/when the
2244 # progress-measurer gets smart enough to count buckets (we'll
2245 # have to interrupt it even earlier, before it's finished the
2248 self.failUnless("cycle-to-date" in s)
2249 self.failUnless("estimated-remaining-cycle" in s)
2250 self.failUnless("estimated-current-cycle" in s)
2252 left = s["estimated-remaining-cycle"]["space-recovered"]
2253 self.failUnlessEqual(left["actual-buckets"], None)
2254 self.failUnlessEqual(left["original-buckets"], None)
2255 self.failUnlessEqual(left["configured-buckets"], None)
2256 self.failUnlessEqual(left["actual-shares"], None)
2257 self.failUnlessEqual(left["original-shares"], None)
2258 self.failUnlessEqual(left["configured-shares"], None)
2259 self.failUnlessEqual(left["actual-diskbytes"], None)
2260 self.failUnlessEqual(left["original-diskbytes"], None)
2261 self.failUnlessEqual(left["configured-diskbytes"], None)
2262 self.failUnlessEqual(left["actual-sharebytes"], None)
2263 self.failUnlessEqual(left["original-sharebytes"], None)
2264 self.failUnlessEqual(left["configured-sharebytes"], None)
2266 full = s["estimated-remaining-cycle"]["space-recovered"]
2267 self.failUnlessEqual(full["actual-buckets"], None)
2268 self.failUnlessEqual(full["original-buckets"], None)
2269 self.failUnlessEqual(full["configured-buckets"], None)
2270 self.failUnlessEqual(full["actual-shares"], None)
2271 self.failUnlessEqual(full["original-shares"], None)
2272 self.failUnlessEqual(full["configured-shares"], None)
2273 self.failUnlessEqual(full["actual-diskbytes"], None)
2274 self.failUnlessEqual(full["original-diskbytes"], None)
2275 self.failUnlessEqual(full["configured-diskbytes"], None)
2276 self.failUnlessEqual(full["actual-sharebytes"], None)
2277 self.failUnlessEqual(full["original-sharebytes"], None)
2278 self.failUnlessEqual(full["configured-sharebytes"], None)
2280 d.addCallback(_check)
2283 def test_no_st_blocks(self):
2284 basedir = "storage/LeaseCrawler/no_st_blocks"
2285 fileutil.make_dirs(basedir)
2286 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2287 expiration_mode="age",
2288 expiration_override_lease_duration=-1000)
2289 # a negative expiration_time= means the "configured-"
2290 # space-recovered counts will be non-zero, since all shares will have
2293 # make it start sooner than usual.
2294 lc = ss.lease_checker
2297 self.make_shares(ss)
2298 ss.setServiceParent(self.s)
2300 return bool(lc.get_state()["last-cycle-finished"] is not None)
2301 d = self.poll(_wait)
2303 def _check(ignored):
2305 last = s["history"][0]
2306 rec = last["space-recovered"]
2307 self.failUnlessEqual(rec["configured-buckets"], 4)
2308 self.failUnlessEqual(rec["configured-shares"], 4)
2309 self.failUnless(rec["configured-sharebytes"] > 0,
2310 rec["configured-sharebytes"])
2311 # without the .st_blocks field in os.stat() results, we should be
2312 # reporting diskbytes==sharebytes
2313 self.failUnlessEqual(rec["configured-sharebytes"],
2314 rec["configured-diskbytes"])
2315 d.addCallback(_check)
2318 def test_share_corruption(self):
2319 self._poll_should_ignore_these_errors = [
2320 UnknownMutableContainerVersionError,
2321 UnknownImmutableContainerVersionError,
2323 basedir = "storage/LeaseCrawler/share_corruption"
2324 fileutil.make_dirs(basedir)
2325 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2326 w = StorageStatus(ss)
2327 # make it start sooner than usual.
2328 lc = ss.lease_checker
2329 lc.stop_after_first_bucket = True
2333 # create a few shares, with some leases on them
2334 self.make_shares(ss)
2336 # now corrupt one, and make sure the lease-checker keeps going
2337 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2338 first = min(self.sis)
2339 first_b32 = base32.b2a(first)
2340 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2343 f.write("BAD MAGIC")
2345 # if get_share_file() doesn't see the correct mutable magic, it
2346 # assumes the file is an immutable share, and then
2347 # immutable.ShareFile sees a bad version. So regardless of which kind
2348 # of share we corrupted, this will trigger an
2349 # UnknownImmutableContainerVersionError.
2351 # also create an empty bucket
2352 empty_si = base32.b2a("\x04"*16)
2353 empty_bucket_dir = os.path.join(ss.sharedir,
2354 storage_index_to_dir(empty_si))
2355 fileutil.make_dirs(empty_bucket_dir)
2357 ss.setServiceParent(self.s)
2359 d = fireEventually()
2361 # now examine the state right after the first bucket has been
2363 def _after_first_bucket(ignored):
2364 so_far = lc.get_state()["cycle-to-date"]
2365 rec = so_far["space-recovered"]
2366 self.failUnlessEqual(rec["examined-buckets"], 1)
2367 self.failUnlessEqual(rec["examined-shares"], 0)
2368 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2369 d.addCallback(_after_first_bucket)
2371 d.addCallback(lambda ign: self.render_json(w))
2372 def _check_json(json):
2373 data = simplejson.loads(json)
2374 # grr. json turns all dict keys into strings.
2375 so_far = data["lease-checker"]["cycle-to-date"]
2376 corrupt_shares = so_far["corrupt-shares"]
2377 # it also turns all tuples into lists
2378 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2379 d.addCallback(_check_json)
2380 d.addCallback(lambda ign: self.render1(w))
2381 def _check_html(html):
2382 s = remove_tags(html)
2383 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2384 d.addCallback(_check_html)
2387 return bool(lc.get_state()["last-cycle-finished"] is not None)
2388 d.addCallback(lambda ign: self.poll(_wait))
2390 def _after_first_cycle(ignored):
2392 last = s["history"][0]
2393 rec = last["space-recovered"]
2394 self.failUnlessEqual(rec["examined-buckets"], 5)
2395 self.failUnlessEqual(rec["examined-shares"], 3)
2396 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2397 d.addCallback(_after_first_cycle)
2398 d.addCallback(lambda ign: self.render_json(w))
2399 def _check_json_history(json):
2400 data = simplejson.loads(json)
2401 last = data["lease-checker"]["history"]["0"]
2402 corrupt_shares = last["corrupt-shares"]
2403 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2404 d.addCallback(_check_json_history)
2405 d.addCallback(lambda ign: self.render1(w))
2406 def _check_html_history(html):
2407 s = remove_tags(html)
2408 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2409 d.addCallback(_check_html_history)
2412 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2413 UnknownImmutableContainerVersionError)
2418 def render_json(self, page):
2419 d = self.render1(page, args={"t": ["json"]})
2422 class NoDiskStatsServer(StorageServer):
2423 def get_disk_stats(self):
2424 raise AttributeError
2426 class BadDiskStatsServer(StorageServer):
2427 def get_disk_stats(self):
2430 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2433 self.s = service.MultiService()
2434 self.s.startService()
2436 return self.s.stopService()
2438 def test_no_server(self):
2439 w = StorageStatus(None)
2440 html = w.renderSynchronously()
2441 self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2443 def test_status(self):
2444 basedir = "storage/WebStatus/status"
2445 fileutil.make_dirs(basedir)
2446 ss = StorageServer(basedir, "\x00" * 20)
2447 ss.setServiceParent(self.s)
2448 w = StorageStatus(ss)
2450 def _check_html(html):
2451 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2452 s = remove_tags(html)
2453 self.failUnless("Accepting new shares: Yes" in s, s)
2454 self.failUnless("Reserved space: - 0 B (0)" in s, s)
2455 d.addCallback(_check_html)
2456 d.addCallback(lambda ign: self.render_json(w))
2457 def _check_json(json):
2458 data = simplejson.loads(json)
2460 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2461 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2462 self.failUnless("bucket-counter" in data)
2463 self.failUnless("lease-checker" in data)
2464 d.addCallback(_check_json)
2467 def render_json(self, page):
2468 d = self.render1(page, args={"t": ["json"]})
2471 def test_status_no_disk_stats(self):
2472 # Some platforms may have no disk stats API. Make sure the code can handle that
2473 # (test runs on all platforms).
2474 basedir = "storage/WebStatus/status_no_disk_stats"
2475 fileutil.make_dirs(basedir)
2476 ss = NoDiskStatsServer(basedir, "\x00" * 20)
2477 ss.setServiceParent(self.s)
2478 w = StorageStatus(ss)
2479 html = w.renderSynchronously()
2480 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2481 s = remove_tags(html)
2482 self.failUnless("Accepting new shares: Yes" in s, s)
2483 self.failUnless("Total disk space: ?" in s, s)
2484 self.failUnless("Space Available to Tahoe: ?" in s, s)
2485 self.failUnless(ss.get_available_space() is None)
2487 def test_status_bad_disk_stats(self):
2488 # If the API to get disk stats exists but a call to it fails, then the status should
2489 # show that no shares will be accepted, and get_available_space() should be 0.
2490 basedir = "storage/WebStatus/status_bad_disk_stats"
2491 fileutil.make_dirs(basedir)
2492 ss = BadDiskStatsServer(basedir, "\x00" * 20)
2493 ss.setServiceParent(self.s)
2494 w = StorageStatus(ss)
2495 html = w.renderSynchronously()
2496 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2497 s = remove_tags(html)
2498 self.failUnless("Accepting new shares: No" in s, s)
2499 self.failUnless("Total disk space: ?" in s, s)
2500 self.failUnless("Space Available to Tahoe: ?" in s, s)
2501 self.failUnless(ss.get_available_space() == 0)
2503 def test_readonly(self):
2504 basedir = "storage/WebStatus/readonly"
2505 fileutil.make_dirs(basedir)
2506 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2507 ss.setServiceParent(self.s)
2508 w = StorageStatus(ss)
2509 html = w.renderSynchronously()
2510 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2511 s = remove_tags(html)
2512 self.failUnless("Accepting new shares: No" in s, s)
2514 def test_reserved(self):
2515 basedir = "storage/WebStatus/reserved"
2516 fileutil.make_dirs(basedir)
2517 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2518 ss.setServiceParent(self.s)
2519 w = StorageStatus(ss)
2520 html = w.renderSynchronously()
2521 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2522 s = remove_tags(html)
2523 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2525 def test_huge_reserved(self):
2526 basedir = "storage/WebStatus/reserved"
2527 fileutil.make_dirs(basedir)
2528 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2529 ss.setServiceParent(self.s)
2530 w = StorageStatus(ss)
2531 html = w.renderSynchronously()
2532 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2533 s = remove_tags(html)
2534 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2536 def test_util(self):
2537 w = StorageStatus(None)
2538 self.failUnlessEqual(w.render_space(None, None), "?")
2539 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2540 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2541 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2542 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2543 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)