2 import time, os.path, stat, re, simplejson, struct
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
30 def __init__(self, ignore_disconnectors=False):
31 self.ignore = ignore_disconnectors
32 self.disconnectors = {}
33 def notifyOnDisconnect(self, f, *args, **kwargs):
37 self.disconnectors[m] = (f, args, kwargs)
39 def dontNotifyOnDisconnect(self, marker):
42 del self.disconnectors[marker]
44 class FakeStatsProvider:
45 def count(self, name, delta=1):
47 def register_producer(self, producer):
50 class Bucket(unittest.TestCase):
51 def make_workdir(self, name):
52 basedir = os.path.join("storage", "Bucket", name)
53 incoming = os.path.join(basedir, "tmp", "bucket")
54 final = os.path.join(basedir, "bucket")
55 fileutil.make_dirs(basedir)
56 fileutil.make_dirs(os.path.join(basedir, "tmp"))
57 return incoming, final
59 def bucket_writer_closed(self, bw, consumed):
61 def add_latency(self, category, latency):
63 def count(self, name, delta=1):
68 renew_secret = os.urandom(32)
69 cancel_secret = os.urandom(32)
70 expiration_time = time.time() + 5000
71 return LeaseInfo(owner_num, renew_secret, cancel_secret,
72 expiration_time, "\x00" * 20)
74 def test_create(self):
75 incoming, final = self.make_workdir("test_create")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*25)
81 bw.remote_write(75, "d"*7)
84 def test_readwrite(self):
85 incoming, final = self.make_workdir("test_readwrite")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*7) # last block may be short
94 br = BucketReader(self, bw.finalhome)
95 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101 def callRemote(self, methname, *args, **kwargs):
103 meth = getattr(self.target, "remote_" + methname)
104 return meth(*args, **kwargs)
105 return defer.maybeDeferred(_call)
107 class BucketProxy(unittest.TestCase):
108 def make_bucket(self, name, size):
109 basedir = os.path.join("storage", "BucketProxy", name)
110 incoming = os.path.join(basedir, "tmp", "bucket")
111 final = os.path.join(basedir, "bucket")
112 fileutil.make_dirs(basedir)
113 fileutil.make_dirs(os.path.join(basedir, "tmp"))
114 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
120 def make_lease(self):
122 renew_secret = os.urandom(32)
123 cancel_secret = os.urandom(32)
124 expiration_time = time.time() + 5000
125 return LeaseInfo(owner_num, renew_secret, cancel_secret,
126 expiration_time, "\x00" * 20)
128 def bucket_writer_closed(self, bw, consumed):
130 def add_latency(self, category, latency):
132 def count(self, name, delta=1):
135 def test_create(self):
136 bw, rb, sharefname = self.make_bucket("test_create", 500)
137 bp = WriteBucketProxy(rb,
142 uri_extension_size_max=500, nodeid=None)
143 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
145 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146 # Let's pretend each share has 100 bytes of data, and that there are
147 # 4 segments (25 bytes each), and 8 shares total. So the two
148 # per-segment merkle trees (crypttext_hash_tree,
149 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152 # long. That should make the whole share:
154 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165 uri_extension = "s" + "E"*498 + "e"
167 bw, rb, sharefname = self.make_bucket(name, sharesize)
173 uri_extension_size_max=len(uri_extension),
177 d.addCallback(lambda res: bp.put_block(0, "a"*25))
178 d.addCallback(lambda res: bp.put_block(1, "b"*25))
179 d.addCallback(lambda res: bp.put_block(2, "c"*25))
180 d.addCallback(lambda res: bp.put_block(3, "d"*20))
181 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185 d.addCallback(lambda res: bp.close())
187 # now read everything back
188 def _start_reading(res):
189 br = BucketReader(self, sharefname)
192 rbp = rbp_class(rb, peerid="abc", storage_index="")
193 self.failUnless("to peer" in repr(rbp))
194 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
196 d1 = rbp.get_block_data(0, 25, 25)
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206 d1.addCallback(lambda res:
207 self.failUnlessEqual(res, crypttext_hashes))
208 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210 d1.addCallback(lambda res: rbp.get_share_hashes())
211 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212 d1.addCallback(lambda res: rbp.get_uri_extension())
213 d1.addCallback(lambda res:
214 self.failUnlessEqual(res, uri_extension))
218 d.addCallback(_start_reading)
222 def test_readwrite_v1(self):
223 return self._do_test_readwrite("test_readwrite_v1",
224 0x24, WriteBucketProxy, ReadBucketProxy)
226 def test_readwrite_v2(self):
227 return self._do_test_readwrite("test_readwrite_v2",
228 0x44, WriteBucketProxy_v2, ReadBucketProxy)
230 class FakeDiskStorageServer(StorageServer):
232 def get_disk_stats(self):
233 return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
235 class Server(unittest.TestCase):
238 self.sparent = LoggingServiceParent()
239 self.sparent.startService()
240 self._lease_secret = itertools.count()
242 return self.sparent.stopService()
244 def workdir(self, name):
245 basedir = os.path.join("storage", "Server", name)
248 def create(self, name, reserved_space=0, klass=StorageServer):
249 workdir = self.workdir(name)
250 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251 stats_provider=FakeStatsProvider())
252 ss.setServiceParent(self.sparent)
255 def test_create(self):
256 ss = self.create("test_create")
258 def allocate(self, ss, storage_index, sharenums, size, canary=None):
259 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
262 canary = FakeCanary()
263 return ss.remote_allocate_buckets(storage_index,
264 renew_secret, cancel_secret,
265 sharenums, size, canary)
267 def test_large_share(self):
268 ss = self.create("test_large_share")
270 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271 self.failUnlessEqual(already, set())
272 self.failUnlessEqual(set(writers.keys()), set([0]))
274 shnum, bucket = writers.items()[0]
275 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
276 bucket.remote_write(2**32, "ab")
277 bucket.remote_close()
279 readers = ss.remote_get_buckets("allocate")
280 reader = readers[shnum]
281 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
284 def test_dont_overfill_dirs(self):
286 This test asserts that if you add a second share whose storage index
287 share lots of leading bits with an extant share (but isn't the exact
288 same storage index), this won't add an entry to the share directory.
290 ss = self.create("test_dont_overfill_dirs")
291 already, writers = self.allocate(ss, "storageindex", [0], 10)
292 for i, wb in writers.items():
293 wb.remote_write(0, "%10d" % i)
295 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
297 children_of_storedir = set(os.listdir(storedir))
299 # Now store another one under another storageindex that has leading
300 # chars the same as the first storageindex.
301 already, writers = self.allocate(ss, "storageindey", [0], 10)
302 for i, wb in writers.items():
303 wb.remote_write(0, "%10d" % i)
305 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
307 new_children_of_storedir = set(os.listdir(storedir))
308 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
310 def test_remove_incoming(self):
311 ss = self.create("test_remove_incoming")
312 already, writers = self.allocate(ss, "vid", range(3), 10)
313 for i,wb in writers.items():
314 wb.remote_write(0, "%10d" % i)
316 incoming_share_dir = wb.incominghome
317 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319 incoming_dir = os.path.dirname(incoming_prefix_dir)
320 self.failIf(os.path.exists(incoming_bucket_dir))
321 self.failIf(os.path.exists(incoming_prefix_dir))
322 self.failUnless(os.path.exists(incoming_dir))
324 def test_allocate(self):
325 ss = self.create("test_allocate")
327 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
329 canary = FakeCanary()
330 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
331 self.failUnlessEqual(already, set())
332 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
334 # while the buckets are open, they should not count as readable
335 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
338 for i,wb in writers.items():
339 wb.remote_write(0, "%25d" % i)
341 # aborting a bucket that was already closed is a no-op
344 # now they should be readable
345 b = ss.remote_get_buckets("allocate")
346 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
347 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
349 self.failUnless("BucketReader" in b_str, b_str)
350 self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
352 # now if we ask about writing again, the server should offer those
353 # three buckets as already present. It should offer them even if we
354 # don't ask about those specific ones.
355 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
356 self.failUnlessEqual(already, set([0,1,2]))
357 self.failUnlessEqual(set(writers.keys()), set([3,4]))
359 # while those two buckets are open for writing, the server should
360 # refuse to offer them to uploaders
362 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
363 self.failUnlessEqual(already2, set([0,1,2]))
364 self.failUnlessEqual(set(writers2.keys()), set([5]))
366 # aborting the writes should remove the tempfiles
367 for i,wb in writers2.items():
369 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
370 self.failUnlessEqual(already2, set([0,1,2]))
371 self.failUnlessEqual(set(writers2.keys()), set([5]))
373 for i,wb in writers2.items():
375 for i,wb in writers.items():
378 def test_bad_container_version(self):
379 ss = self.create("test_bad_container_version")
380 a,w = self.allocate(ss, "si1", [0], 10)
381 w[0].remote_write(0, "\xff"*10)
384 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
387 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
390 b = ss.remote_get_buckets("allocate")
392 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
393 ss.remote_get_buckets, "si1")
394 self.failUnless(" had version 0 but we wanted 1" in str(e), e)
396 def test_disconnect(self):
397 # simulate a disconnection
398 ss = self.create("test_disconnect")
399 canary = FakeCanary()
400 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
401 self.failUnlessEqual(already, set())
402 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
403 for (f,args,kwargs) in canary.disconnectors.values():
408 # that ought to delete the incoming shares
409 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
410 self.failUnlessEqual(already, set())
411 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
413 def test_reserved_space(self):
414 ss = self.create("test_reserved_space", reserved_space=10000,
415 klass=FakeDiskStorageServer)
416 # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
418 # 15k available, 10k reserved, leaves 5k for shares
420 # a newly created and filled share incurs this much overhead, beyond
421 # the size we request.
423 LEASE_SIZE = 4+32+32+4
424 canary = FakeCanary(True)
425 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
426 self.failUnlessEqual(len(writers), 3)
427 # now the StorageServer should have 3000 bytes provisionally
428 # allocated, allowing only 2000 more to be claimed
429 self.failUnlessEqual(len(ss._active_writers), 3)
431 # allocating 1001-byte shares only leaves room for one
432 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
433 self.failUnlessEqual(len(writers2), 1)
434 self.failUnlessEqual(len(ss._active_writers), 4)
436 # we abandon the first set, so their provisional allocation should be
440 self.failUnlessEqual(len(ss._active_writers), 1)
441 # now we have a provisional allocation of 1001 bytes
443 # and we close the second set, so their provisional allocation should
444 # become real, long-term allocation, and grows to include the
446 for bw in writers2.values():
447 bw.remote_write(0, "a"*25)
452 self.failUnlessEqual(len(ss._active_writers), 0)
454 allocated = 1001 + OVERHEAD + LEASE_SIZE
456 # we have to manually increase DISKAVAIL, since we're not doing real
458 ss.DISKAVAIL -= allocated
460 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
461 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
462 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
463 self.failUnlessEqual(len(writers3), 39)
464 self.failUnlessEqual(len(ss._active_writers), 39)
468 self.failUnlessEqual(len(ss._active_writers), 0)
469 ss.disownServiceParent()
472 def test_disk_stats(self):
473 # This will spuriously fail if there is zero disk space left (but so will other tests).
474 ss = self.create("test_disk_stats", reserved_space=0)
476 disk = ss.get_disk_stats()
477 self.failUnless(disk['total'] > 0, disk['total'])
478 self.failUnless(disk['used'] > 0, disk['used'])
479 self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
480 self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
481 self.failUnless(disk['avail'] > 0, disk['avail'])
483 def test_disk_stats_avail_nonnegative(self):
484 ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
486 disk = ss.get_disk_stats()
487 self.failUnlessEqual(disk['avail'], 0)
490 basedir = self.workdir("test_seek_behavior")
491 fileutil.make_dirs(basedir)
492 filename = os.path.join(basedir, "testfile")
493 f = open(filename, "wb")
496 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
497 # files. mode="a" preserves previous contents but does not allow
498 # seeking-to-create-holes. mode="r+" allows both.
499 f = open(filename, "rb+")
503 filelen = os.stat(filename)[stat.ST_SIZE]
504 self.failUnlessEqual(filelen, 100+3)
505 f2 = open(filename, "rb")
506 self.failUnlessEqual(f2.read(5), "start")
509 def test_leases(self):
510 ss = self.create("test_leases")
511 canary = FakeCanary()
515 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
516 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
517 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
518 sharenums, size, canary)
519 self.failUnlessEqual(len(already), 0)
520 self.failUnlessEqual(len(writers), 5)
521 for wb in writers.values():
524 leases = list(ss.get_leases("si0"))
525 self.failUnlessEqual(len(leases), 1)
526 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
528 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
529 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
530 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
531 sharenums, size, canary)
532 for wb in writers.values():
535 # take out a second lease on si1
536 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
537 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
538 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
539 sharenums, size, canary)
540 self.failUnlessEqual(len(already), 5)
541 self.failUnlessEqual(len(writers), 0)
543 leases = list(ss.get_leases("si1"))
544 self.failUnlessEqual(len(leases), 2)
545 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
547 # and a third lease, using add-lease
548 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
549 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
550 ss.remote_add_lease("si1", rs2a, cs2a)
551 leases = list(ss.get_leases("si1"))
552 self.failUnlessEqual(len(leases), 3)
553 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
555 # add-lease on a missing storage index is silently ignored
556 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
558 # check that si0 is readable
559 readers = ss.remote_get_buckets("si0")
560 self.failUnlessEqual(len(readers), 5)
562 # renew the first lease. Only the proper renew_secret should work
563 ss.remote_renew_lease("si0", rs0)
564 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
565 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
567 # check that si0 is still readable
568 readers = ss.remote_get_buckets("si0")
569 self.failUnlessEqual(len(readers), 5)
572 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
573 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
574 ss.remote_cancel_lease("si0", cs0)
576 # si0 should now be gone
577 readers = ss.remote_get_buckets("si0")
578 self.failUnlessEqual(len(readers), 0)
579 # and the renew should no longer work
580 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
583 # cancel the first lease on si1, leaving the second and third in place
584 ss.remote_cancel_lease("si1", cs1)
585 readers = ss.remote_get_buckets("si1")
586 self.failUnlessEqual(len(readers), 5)
587 # the corresponding renew should no longer work
588 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
590 leases = list(ss.get_leases("si1"))
591 self.failUnlessEqual(len(leases), 2)
592 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
594 ss.remote_renew_lease("si1", rs2)
595 # cancelling the second and third should make it go away
596 ss.remote_cancel_lease("si1", cs2)
597 ss.remote_cancel_lease("si1", cs2a)
598 readers = ss.remote_get_buckets("si1")
599 self.failUnlessEqual(len(readers), 0)
600 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
601 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
602 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
604 leases = list(ss.get_leases("si1"))
605 self.failUnlessEqual(len(leases), 0)
608 # test overlapping uploads
609 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
610 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
611 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
612 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
613 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
614 sharenums, size, canary)
615 self.failUnlessEqual(len(already), 0)
616 self.failUnlessEqual(len(writers), 5)
617 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
618 sharenums, size, canary)
619 self.failUnlessEqual(len(already2), 0)
620 self.failUnlessEqual(len(writers2), 0)
621 for wb in writers.values():
624 leases = list(ss.get_leases("si3"))
625 self.failUnlessEqual(len(leases), 1)
627 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
628 sharenums, size, canary)
629 self.failUnlessEqual(len(already3), 5)
630 self.failUnlessEqual(len(writers3), 0)
632 leases = list(ss.get_leases("si3"))
633 self.failUnlessEqual(len(leases), 2)
635 def test_readonly(self):
636 workdir = self.workdir("test_readonly")
637 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
638 ss.setServiceParent(self.sparent)
640 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
641 self.failUnlessEqual(already, set())
642 self.failUnlessEqual(writers, {})
644 stats = ss.get_stats()
645 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
646 if "storage_server.disk_avail" in stats:
647 # Some platforms may not have an API to get disk stats.
648 # But if there are stats, readonly_storage means disk_avail=0
649 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
651 def test_discard(self):
652 # discard is really only used for other tests, but we test it anyways
653 workdir = self.workdir("test_discard")
654 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
655 ss.setServiceParent(self.sparent)
657 canary = FakeCanary()
658 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
659 self.failUnlessEqual(already, set())
660 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
661 for i,wb in writers.items():
662 wb.remote_write(0, "%25d" % i)
664 # since we discard the data, the shares should be present but sparse.
665 # Since we write with some seeks, the data we read back will be all
667 b = ss.remote_get_buckets("vid")
668 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
669 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
671 def test_advise_corruption(self):
672 workdir = self.workdir("test_advise_corruption")
673 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
674 ss.setServiceParent(self.sparent)
676 si0_s = base32.b2a("si0")
677 ss.remote_advise_corrupt_share("immutable", "si0", 0,
678 "This share smells funny.\n")
679 reportdir = os.path.join(workdir, "corruption-advisories")
680 reports = os.listdir(reportdir)
681 self.failUnlessEqual(len(reports), 1)
682 report_si0 = reports[0]
683 self.failUnless(si0_s in report_si0, report_si0)
684 f = open(os.path.join(reportdir, report_si0), "r")
687 self.failUnless("type: immutable" in report)
688 self.failUnless(("storage_index: %s" % si0_s) in report)
689 self.failUnless("share_number: 0" in report)
690 self.failUnless("This share smells funny." in report)
692 # test the RIBucketWriter version too
693 si1_s = base32.b2a("si1")
694 already,writers = self.allocate(ss, "si1", [1], 75)
695 self.failUnlessEqual(already, set())
696 self.failUnlessEqual(set(writers.keys()), set([1]))
697 writers[1].remote_write(0, "data")
698 writers[1].remote_close()
700 b = ss.remote_get_buckets("si1")
701 self.failUnlessEqual(set(b.keys()), set([1]))
702 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
704 reports = os.listdir(reportdir)
705 self.failUnlessEqual(len(reports), 2)
706 report_si1 = [r for r in reports if si1_s in r][0]
707 f = open(os.path.join(reportdir, report_si1), "r")
710 self.failUnless("type: immutable" in report)
711 self.failUnless(("storage_index: %s" % si1_s) in report)
712 self.failUnless("share_number: 1" in report)
713 self.failUnless("This share tastes like dust." in report)
717 class MutableServer(unittest.TestCase):
720 self.sparent = LoggingServiceParent()
721 self._lease_secret = itertools.count()
723 return self.sparent.stopService()
725 def workdir(self, name):
726 basedir = os.path.join("storage", "MutableServer", name)
729 def create(self, name):
730 workdir = self.workdir(name)
731 ss = StorageServer(workdir, "\x00" * 20)
732 ss.setServiceParent(self.sparent)
735 def test_create(self):
736 ss = self.create("test_create")
738 def write_enabler(self, we_tag):
739 return hashutil.tagged_hash("we_blah", we_tag)
741 def renew_secret(self, tag):
742 return hashutil.tagged_hash("renew_blah", str(tag))
744 def cancel_secret(self, tag):
745 return hashutil.tagged_hash("cancel_blah", str(tag))
747 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
748 write_enabler = self.write_enabler(we_tag)
749 renew_secret = self.renew_secret(lease_tag)
750 cancel_secret = self.cancel_secret(lease_tag)
751 rstaraw = ss.remote_slot_testv_and_readv_and_writev
752 testandwritev = dict( [ (shnum, ([], [], None) )
753 for shnum in sharenums ] )
755 rc = rstaraw(storage_index,
756 (write_enabler, renew_secret, cancel_secret),
759 (did_write, readv_data) = rc
760 self.failUnless(did_write)
761 self.failUnless(isinstance(readv_data, dict))
762 self.failUnlessEqual(len(readv_data), 0)
764 def test_bad_magic(self):
765 ss = self.create("test_bad_magic")
766 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
767 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
772 read = ss.remote_slot_readv
773 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
774 read, "si1", [0], [(0,10)])
775 self.failUnless(" had magic " in str(e), e)
776 self.failUnless(" but we wanted " in str(e), e)
778 def test_container_size(self):
779 ss = self.create("test_container_size")
780 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
782 read = ss.remote_slot_readv
783 rstaraw = ss.remote_slot_testv_and_readv_and_writev
784 secrets = ( self.write_enabler("we1"),
785 self.renew_secret("we1"),
786 self.cancel_secret("we1") )
787 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
788 answer = rstaraw("si1", secrets,
789 {0: ([], [(0,data)], len(data)+12)},
791 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
793 # trying to make the container too large will raise an exception
794 TOOBIG = MutableShareFile.MAX_SIZE + 10
795 self.failUnlessRaises(DataTooLargeError,
796 rstaraw, "si1", secrets,
797 {0: ([], [(0,data)], TOOBIG)},
800 # it should be possible to make the container smaller, although at
801 # the moment this doesn't actually affect the share, unless the
802 # container size is dropped to zero, in which case the share is
804 answer = rstaraw("si1", secrets,
805 {0: ([], [(0,data)], len(data)+8)},
807 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
809 answer = rstaraw("si1", secrets,
810 {0: ([], [(0,data)], 0)},
812 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
814 read_answer = read("si1", [0], [(0,10)])
815 self.failUnlessEqual(read_answer, {})
817 def test_allocate(self):
818 ss = self.create("test_allocate")
819 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
822 read = ss.remote_slot_readv
823 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
825 self.failUnlessEqual(read("si1", [], [(0, 10)]),
826 {0: [""], 1: [""], 2: [""]})
827 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
831 secrets = ( self.write_enabler("we1"),
832 self.renew_secret("we1"),
833 self.cancel_secret("we1") )
834 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
835 write = ss.remote_slot_testv_and_readv_and_writev
836 answer = write("si1", secrets,
837 {0: ([], [(0,data)], None)},
839 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
841 self.failUnlessEqual(read("si1", [0], [(0,20)]),
842 {0: ["00000000001111111111"]})
843 self.failUnlessEqual(read("si1", [0], [(95,10)]),
845 #self.failUnlessEqual(s0.remote_get_length(), 100)
847 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
848 f = self.failUnlessRaises(BadWriteEnablerError,
849 write, "si1", bad_secrets,
851 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
853 # this testv should fail
854 answer = write("si1", secrets,
855 {0: ([(0, 12, "eq", "444444444444"),
856 (20, 5, "eq", "22222"),
863 self.failUnlessEqual(answer, (False,
864 {0: ["000000000011", "22222"],
868 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
871 answer = write("si1", secrets,
872 {0: ([(10, 5, "lt", "11111"),
879 self.failUnlessEqual(answer, (False,
884 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
887 def test_operators(self):
888 # test operators, the data we're comparing is '11111' in all cases.
889 # test both fail+pass, reset data after each one.
890 ss = self.create("test_operators")
892 secrets = ( self.write_enabler("we1"),
893 self.renew_secret("we1"),
894 self.cancel_secret("we1") )
895 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
896 write = ss.remote_slot_testv_and_readv_and_writev
897 read = ss.remote_slot_readv
900 write("si1", secrets,
901 {0: ([], [(0,data)], None)},
907 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
912 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
913 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
914 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
917 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
922 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
923 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
926 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
931 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
932 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
936 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
941 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
942 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
945 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
950 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
951 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
954 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
959 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
960 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
964 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
969 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
970 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
973 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
978 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
979 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
983 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
988 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
989 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
992 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
997 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
998 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1002 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1007 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1008 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1011 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1016 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1017 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1020 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1025 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1026 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1030 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1035 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1036 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1039 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1044 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1045 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1048 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1053 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1054 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1057 # finally, test some operators against empty shares
1058 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1063 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1064 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1067 def test_readv(self):
1068 ss = self.create("test_readv")
1069 secrets = ( self.write_enabler("we1"),
1070 self.renew_secret("we1"),
1071 self.cancel_secret("we1") )
1072 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1073 write = ss.remote_slot_testv_and_readv_and_writev
1074 read = ss.remote_slot_readv
1075 data = [("%d" % i) * 100 for i in range(3)]
1076 rc = write("si1", secrets,
1077 {0: ([], [(0,data[0])], None),
1078 1: ([], [(0,data[1])], None),
1079 2: ([], [(0,data[2])], None),
1081 self.failUnlessEqual(rc, (True, {}))
1083 answer = read("si1", [], [(0, 10)])
1084 self.failUnlessEqual(answer, {0: ["0"*10],
1088 def compare_leases_without_timestamps(self, leases_a, leases_b):
1089 self.failUnlessEqual(len(leases_a), len(leases_b))
1090 for i in range(len(leases_a)):
1093 self.failUnlessEqual(a.owner_num, b.owner_num)
1094 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1095 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1096 self.failUnlessEqual(a.nodeid, b.nodeid)
1098 def compare_leases(self, leases_a, leases_b):
1099 self.failUnlessEqual(len(leases_a), len(leases_b))
1100 for i in range(len(leases_a)):
1103 self.failUnlessEqual(a.owner_num, b.owner_num)
1104 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1105 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1106 self.failUnlessEqual(a.nodeid, b.nodeid)
1107 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1109 def test_leases(self):
1110 ss = self.create("test_leases")
1112 return ( self.write_enabler("we1"),
1113 self.renew_secret("we1-%d" % n),
1114 self.cancel_secret("we1-%d" % n) )
1115 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1116 write = ss.remote_slot_testv_and_readv_and_writev
1117 read = ss.remote_slot_readv
1118 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1119 self.failUnlessEqual(rc, (True, {}))
1121 # create a random non-numeric file in the bucket directory, to
1122 # exercise the code that's supposed to ignore those.
1123 bucket_dir = os.path.join(self.workdir("test_leases"),
1124 "shares", storage_index_to_dir("si1"))
1125 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1126 f.write("you ought to be ignoring me\n")
1129 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1130 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1132 # add-lease on a missing storage index is silently ignored
1133 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1135 # re-allocate the slots and use the same secrets, that should update
1137 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1138 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1141 ss.remote_renew_lease("si1", secrets(0)[1])
1142 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1144 # now allocate them with a bunch of different secrets, to trigger the
1145 # extended lease code. Use add_lease for one of them.
1146 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1147 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1148 secrets2 = secrets(2)
1149 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1150 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1151 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1152 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1153 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1155 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1157 # cancel one of them
1158 ss.remote_cancel_lease("si1", secrets(5)[2])
1159 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1161 all_leases = list(s0.get_leases())
1162 # and write enough data to expand the container, forcing the server
1163 # to move the leases
1164 write("si1", secrets(0),
1165 {0: ([], [(0,data)], 200), },
1168 # read back the leases, make sure they're still intact.
1169 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1171 ss.remote_renew_lease("si1", secrets(0)[1])
1172 ss.remote_renew_lease("si1", secrets(1)[1])
1173 ss.remote_renew_lease("si1", secrets(2)[1])
1174 ss.remote_renew_lease("si1", secrets(3)[1])
1175 ss.remote_renew_lease("si1", secrets(4)[1])
1176 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1177 # get a new copy of the leases, with the current timestamps. Reading
1178 # data and failing to renew/cancel leases should leave the timestamps
1180 all_leases = list(s0.get_leases())
1181 # renewing with a bogus token should prompt an error message
1183 # examine the exception thus raised, make sure the old nodeid is
1184 # present, to provide for share migration
1185 e = self.failUnlessRaises(IndexError,
1186 ss.remote_renew_lease, "si1",
1189 self.failUnless("Unable to renew non-existent lease" in e_s)
1190 self.failUnless("I have leases accepted by nodeids:" in e_s)
1191 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1193 # same for cancelling
1194 self.failUnlessRaises(IndexError,
1195 ss.remote_cancel_lease, "si1",
1197 self.compare_leases(all_leases, list(s0.get_leases()))
1199 # reading shares should not modify the timestamp
1200 read("si1", [], [(0,200)])
1201 self.compare_leases(all_leases, list(s0.get_leases()))
1203 write("si1", secrets(0),
1204 {0: ([], [(200, "make me bigger")], None)}, [])
1205 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1207 write("si1", secrets(0),
1208 {0: ([], [(500, "make me really bigger")], None)}, [])
1209 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1211 # now cancel them all
1212 ss.remote_cancel_lease("si1", secrets(0)[2])
1213 ss.remote_cancel_lease("si1", secrets(1)[2])
1214 ss.remote_cancel_lease("si1", secrets(2)[2])
1215 ss.remote_cancel_lease("si1", secrets(3)[2])
1217 # the slot should still be there
1218 remaining_shares = read("si1", [], [(0,10)])
1219 self.failUnlessEqual(len(remaining_shares), 1)
1220 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1222 # cancelling a non-existent lease should raise an IndexError
1223 self.failUnlessRaises(IndexError,
1224 ss.remote_cancel_lease, "si1", "nonsecret")
1226 # and the slot should still be there
1227 remaining_shares = read("si1", [], [(0,10)])
1228 self.failUnlessEqual(len(remaining_shares), 1)
1229 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1231 ss.remote_cancel_lease("si1", secrets(4)[2])
1232 # now the slot should be gone
1233 no_shares = read("si1", [], [(0,10)])
1234 self.failUnlessEqual(no_shares, {})
1236 # cancelling a lease on a non-existent share should raise an IndexError
1237 self.failUnlessRaises(IndexError,
1238 ss.remote_cancel_lease, "si2", "nonsecret")
1240 def test_remove(self):
1241 ss = self.create("test_remove")
1242 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1244 readv = ss.remote_slot_readv
1245 writev = ss.remote_slot_testv_and_readv_and_writev
1246 secrets = ( self.write_enabler("we1"),
1247 self.renew_secret("we1"),
1248 self.cancel_secret("we1") )
1249 # delete sh0 by setting its size to zero
1250 answer = writev("si1", secrets,
1253 # the answer should mention all the shares that existed before the
1255 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1256 # but a new read should show only sh1 and sh2
1257 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1260 # delete sh1 by setting its size to zero
1261 answer = writev("si1", secrets,
1264 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1265 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1268 # delete sh2 by setting its size to zero
1269 answer = writev("si1", secrets,
1272 self.failUnlessEqual(answer, (True, {2:[]}) )
1273 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1275 # and the bucket directory should now be gone
1276 si = base32.b2a("si1")
1277 # note: this is a detail of the storage server implementation, and
1278 # may change in the future
1280 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1281 bucketdir = os.path.join(prefixdir, si)
1282 self.failUnless(os.path.exists(prefixdir))
1283 self.failIf(os.path.exists(bucketdir))
1285 class Stats(unittest.TestCase):
1288 self.sparent = LoggingServiceParent()
1289 self._lease_secret = itertools.count()
1291 return self.sparent.stopService()
1293 def workdir(self, name):
1294 basedir = os.path.join("storage", "Server", name)
1297 def create(self, name):
1298 workdir = self.workdir(name)
1299 ss = StorageServer(workdir, "\x00" * 20)
1300 ss.setServiceParent(self.sparent)
1303 def test_latencies(self):
1304 ss = self.create("test_latencies")
1305 for i in range(10000):
1306 ss.add_latency("allocate", 1.0 * i)
1307 for i in range(1000):
1308 ss.add_latency("renew", 1.0 * i)
1310 ss.add_latency("cancel", 2.0 * i)
1311 ss.add_latency("get", 5.0)
1313 output = ss.get_latencies()
1315 self.failUnlessEqual(sorted(output.keys()),
1316 sorted(["allocate", "renew", "cancel", "get"]))
1317 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1318 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1319 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1320 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1321 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1322 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1323 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1324 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1325 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1327 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1328 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1329 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1330 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1331 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1332 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1333 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1334 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1335 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1337 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1338 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1339 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1340 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1341 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1342 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1343 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1344 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1345 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1347 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1348 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1349 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1350 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1351 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1352 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1353 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1354 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1355 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1358 s = re.sub(r'<[^>]*>', ' ', s)
1359 s = re.sub(r'\s+', ' ', s)
1362 class MyBucketCountingCrawler(BucketCountingCrawler):
1363 def finished_prefix(self, cycle, prefix):
1364 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1366 d = self.hook_ds.pop(0)
1369 class MyStorageServer(StorageServer):
1370 def add_bucket_counter(self):
1371 statefile = os.path.join(self.storedir, "bucket_counter.state")
1372 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1373 self.bucket_counter.setServiceParent(self)
1375 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1378 self.s = service.MultiService()
1379 self.s.startService()
1381 return self.s.stopService()
1383 def test_bucket_counter(self):
1384 basedir = "storage/BucketCounter/bucket_counter"
1385 fileutil.make_dirs(basedir)
1386 ss = StorageServer(basedir, "\x00" * 20)
1387 # to make sure we capture the bucket-counting-crawler in the middle
1388 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1389 # also make it start sooner than usual.
1390 ss.bucket_counter.slow_start = 0
1391 orig_cpu_slice = ss.bucket_counter.cpu_slice
1392 ss.bucket_counter.cpu_slice = 0
1393 ss.setServiceParent(self.s)
1395 w = StorageStatus(ss)
1397 # this sample is before the crawler has started doing anything
1398 html = w.renderSynchronously()
1399 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1400 s = remove_tags(html)
1401 self.failUnless("Accepting new shares: Yes" in s, s)
1402 self.failUnless("Reserved space: - 0 B (0)" in s, s)
1403 self.failUnless("Total buckets: Not computed yet" in s, s)
1404 self.failUnless("Next crawl in" in s, s)
1406 # give the bucket-counting-crawler one tick to get started. The
1407 # cpu_slice=0 will force it to yield right after it processes the
1410 d = fireEventually()
1411 def _check(ignored):
1412 # are we really right after the first prefix?
1413 state = ss.bucket_counter.get_state()
1414 self.failUnlessEqual(state["last-complete-prefix"],
1415 ss.bucket_counter.prefixes[0])
1416 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1417 html = w.renderSynchronously()
1418 s = remove_tags(html)
1419 self.failUnless(" Current crawl " in s, s)
1420 self.failUnless(" (next work in " in s, s)
1421 d.addCallback(_check)
1423 # now give it enough time to complete a full cycle
1425 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1426 d.addCallback(lambda ignored: self.poll(_watch))
1427 def _check2(ignored):
1428 ss.bucket_counter.cpu_slice = orig_cpu_slice
1429 html = w.renderSynchronously()
1430 s = remove_tags(html)
1431 self.failUnless("Total buckets: 0 (the number of" in s, s)
1432 self.failUnless("Next crawl in 59 minutes" in s, s)
1433 d.addCallback(_check2)
1436 def test_bucket_counter_cleanup(self):
1437 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1438 fileutil.make_dirs(basedir)
1439 ss = StorageServer(basedir, "\x00" * 20)
1440 # to make sure we capture the bucket-counting-crawler in the middle
1441 # of a cycle, we reach in and reduce its maximum slice time to 0.
1442 ss.bucket_counter.slow_start = 0
1443 orig_cpu_slice = ss.bucket_counter.cpu_slice
1444 ss.bucket_counter.cpu_slice = 0
1445 ss.setServiceParent(self.s)
1447 d = fireEventually()
1449 def _after_first_prefix(ignored):
1450 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1451 # now sneak in and mess with its state, to make sure it cleans up
1452 # properly at the end of the cycle
1453 state = ss.bucket_counter.state
1454 self.failUnlessEqual(state["last-complete-prefix"],
1455 ss.bucket_counter.prefixes[0])
1456 state["bucket-counts"][-12] = {}
1457 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1458 ss.bucket_counter.save_state()
1459 d.addCallback(_after_first_prefix)
1461 # now give it enough time to complete a cycle
1463 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1464 d.addCallback(lambda ignored: self.poll(_watch))
1465 def _check2(ignored):
1466 ss.bucket_counter.cpu_slice = orig_cpu_slice
1467 s = ss.bucket_counter.get_state()
1468 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1469 self.failIf("bogusprefix!" in s["storage-index-samples"],
1470 s["storage-index-samples"].keys())
1471 d.addCallback(_check2)
1474 def test_bucket_counter_eta(self):
1475 basedir = "storage/BucketCounter/bucket_counter_eta"
1476 fileutil.make_dirs(basedir)
1477 ss = MyStorageServer(basedir, "\x00" * 20)
1478 ss.bucket_counter.slow_start = 0
1479 # these will be fired inside finished_prefix()
1480 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1481 w = StorageStatus(ss)
1483 d = defer.Deferred()
1485 def _check_1(ignored):
1486 # no ETA is available yet
1487 html = w.renderSynchronously()
1488 s = remove_tags(html)
1489 self.failUnlessIn("complete (next work", s)
1491 def _check_2(ignored):
1492 # one prefix has finished, so an ETA based upon that elapsed time
1493 # should be available.
1494 html = w.renderSynchronously()
1495 s = remove_tags(html)
1496 self.failUnlessIn("complete (ETA ", s)
1498 def _check_3(ignored):
1499 # two prefixes have finished
1500 html = w.renderSynchronously()
1501 s = remove_tags(html)
1502 self.failUnlessIn("complete (ETA ", s)
1505 hooks[0].addCallback(_check_1).addErrback(d.errback)
1506 hooks[1].addCallback(_check_2).addErrback(d.errback)
1507 hooks[2].addCallback(_check_3).addErrback(d.errback)
1509 ss.setServiceParent(self.s)
1512 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1513 stop_after_first_bucket = False
1514 def process_bucket(self, *args, **kwargs):
1515 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1516 if self.stop_after_first_bucket:
1517 self.stop_after_first_bucket = False
1518 self.cpu_slice = -1.0
1519 def yielding(self, sleep_time):
1520 if not self.stop_after_first_bucket:
1521 self.cpu_slice = 500
1523 class BrokenStatResults:
1525 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1528 bsr = BrokenStatResults()
1529 for attrname in dir(s):
1530 if attrname.startswith("_"):
1532 if attrname == "st_blocks":
1534 setattr(bsr, attrname, getattr(s, attrname))
1537 class InstrumentedStorageServer(StorageServer):
1538 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1539 class No_ST_BLOCKS_StorageServer(StorageServer):
1540 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1542 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1545 self.s = service.MultiService()
1546 self.s.startService()
1548 return self.s.stopService()
1550 def make_shares(self, ss):
1552 return (si, hashutil.tagged_hash("renew", si),
1553 hashutil.tagged_hash("cancel", si))
1554 def make_mutable(si):
1555 return (si, hashutil.tagged_hash("renew", si),
1556 hashutil.tagged_hash("cancel", si),
1557 hashutil.tagged_hash("write-enabler", si))
1558 def make_extra_lease(si, num):
1559 return (hashutil.tagged_hash("renew-%d" % num, si),
1560 hashutil.tagged_hash("cancel-%d" % num, si))
1562 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1563 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1564 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1565 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1566 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1567 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1569 canary = FakeCanary()
1570 # note: 'tahoe debug dump-share' will not handle this file, since the
1571 # inner contents are not a valid CHK share
1572 data = "\xff" * 1000
1574 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1576 w[0].remote_write(0, data)
1579 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1581 w[0].remote_write(0, data)
1583 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1585 writev = ss.remote_slot_testv_and_readv_and_writev
1586 writev(mutable_si_2, (we2, rs2, cs2),
1587 {0: ([], [(0,data)], len(data))}, [])
1588 writev(mutable_si_3, (we3, rs3, cs3),
1589 {0: ([], [(0,data)], len(data))}, [])
1590 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1592 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1593 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1594 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1596 def test_basic(self):
1597 basedir = "storage/LeaseCrawler/basic"
1598 fileutil.make_dirs(basedir)
1599 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1600 # make it start sooner than usual.
1601 lc = ss.lease_checker
1604 lc.stop_after_first_bucket = True
1605 webstatus = StorageStatus(ss)
1607 # create a few shares, with some leases on them
1608 self.make_shares(ss)
1609 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1611 # add a non-sharefile to exercise another code path
1612 fn = os.path.join(ss.sharedir,
1613 storage_index_to_dir(immutable_si_0),
1616 f.write("I am not a share.\n")
1619 # this is before the crawl has started, so we're not in a cycle yet
1620 initial_state = lc.get_state()
1621 self.failIf(lc.get_progress()["cycle-in-progress"])
1622 self.failIf("cycle-to-date" in initial_state)
1623 self.failIf("estimated-remaining-cycle" in initial_state)
1624 self.failIf("estimated-current-cycle" in initial_state)
1625 self.failUnless("history" in initial_state)
1626 self.failUnlessEqual(initial_state["history"], {})
1628 ss.setServiceParent(self.s)
1632 d = fireEventually()
1634 # now examine the state right after the first bucket has been
1636 def _after_first_bucket(ignored):
1637 initial_state = lc.get_state()
1638 self.failUnless("cycle-to-date" in initial_state)
1639 self.failUnless("estimated-remaining-cycle" in initial_state)
1640 self.failUnless("estimated-current-cycle" in initial_state)
1641 self.failUnless("history" in initial_state)
1642 self.failUnlessEqual(initial_state["history"], {})
1644 so_far = initial_state["cycle-to-date"]
1645 self.failUnlessEqual(so_far["expiration-enabled"], False)
1646 self.failUnless("configured-expiration-mode" in so_far)
1647 self.failUnless("lease-age-histogram" in so_far)
1648 lah = so_far["lease-age-histogram"]
1649 self.failUnlessEqual(type(lah), list)
1650 self.failUnlessEqual(len(lah), 1)
1651 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1652 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1653 self.failUnlessEqual(so_far["corrupt-shares"], [])
1654 sr1 = so_far["space-recovered"]
1655 self.failUnlessEqual(sr1["examined-buckets"], 1)
1656 self.failUnlessEqual(sr1["examined-shares"], 1)
1657 self.failUnlessEqual(sr1["actual-shares"], 0)
1658 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1659 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1660 left = initial_state["estimated-remaining-cycle"]
1661 sr2 = left["space-recovered"]
1662 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1663 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1664 self.failIfEqual(sr2["actual-shares"], None)
1665 self.failIfEqual(sr2["configured-diskbytes"], None)
1666 self.failIfEqual(sr2["original-sharebytes"], None)
1667 d.addCallback(_after_first_bucket)
1668 d.addCallback(lambda ign: self.render1(webstatus))
1669 def _check_html_in_cycle(html):
1670 s = remove_tags(html)
1671 self.failUnlessIn("So far, this cycle has examined "
1672 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1673 self.failUnlessIn("and has recovered: "
1674 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1675 "0 B (0 B / 0 B)", s)
1676 self.failUnlessIn("If expiration were enabled, "
1677 "we would have recovered: "
1678 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1679 " 0 B (0 B / 0 B) by now", s)
1680 self.failUnlessIn("and the remainder of this cycle "
1681 "would probably recover: "
1682 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1683 " 0 B (0 B / 0 B)", s)
1684 self.failUnlessIn("and the whole cycle would probably recover: "
1685 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1686 " 0 B (0 B / 0 B)", s)
1687 self.failUnlessIn("if we were strictly using each lease's default "
1688 "31-day lease lifetime", s)
1689 self.failUnlessIn("this cycle would be expected to recover: ", s)
1690 d.addCallback(_check_html_in_cycle)
1692 # wait for the crawler to finish the first cycle. Nothing should have
1695 return bool(lc.get_state()["last-cycle-finished"] is not None)
1696 d.addCallback(lambda ign: self.poll(_wait))
1698 def _after_first_cycle(ignored):
1700 self.failIf("cycle-to-date" in s)
1701 self.failIf("estimated-remaining-cycle" in s)
1702 self.failIf("estimated-current-cycle" in s)
1703 last = s["history"][0]
1704 self.failUnless("cycle-start-finish-times" in last)
1705 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1706 self.failUnlessEqual(last["expiration-enabled"], False)
1707 self.failUnless("configured-expiration-mode" in last)
1709 self.failUnless("lease-age-histogram" in last)
1710 lah = last["lease-age-histogram"]
1711 self.failUnlessEqual(type(lah), list)
1712 self.failUnlessEqual(len(lah), 1)
1713 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1715 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1716 self.failUnlessEqual(last["corrupt-shares"], [])
1718 rec = last["space-recovered"]
1719 self.failUnlessEqual(rec["examined-buckets"], 4)
1720 self.failUnlessEqual(rec["examined-shares"], 4)
1721 self.failUnlessEqual(rec["actual-buckets"], 0)
1722 self.failUnlessEqual(rec["original-buckets"], 0)
1723 self.failUnlessEqual(rec["configured-buckets"], 0)
1724 self.failUnlessEqual(rec["actual-shares"], 0)
1725 self.failUnlessEqual(rec["original-shares"], 0)
1726 self.failUnlessEqual(rec["configured-shares"], 0)
1727 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1728 self.failUnlessEqual(rec["original-diskbytes"], 0)
1729 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1730 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1731 self.failUnlessEqual(rec["original-sharebytes"], 0)
1732 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1734 def _get_sharefile(si):
1735 return list(ss._iter_share_files(si))[0]
1736 def count_leases(si):
1737 return len(list(_get_sharefile(si).get_leases()))
1738 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1739 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1740 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1741 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1742 d.addCallback(_after_first_cycle)
1743 d.addCallback(lambda ign: self.render1(webstatus))
1744 def _check_html(html):
1745 s = remove_tags(html)
1746 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1747 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
1748 "but expiration was not enabled", s)
1749 d.addCallback(_check_html)
1750 d.addCallback(lambda ign: self.render_json(webstatus))
1751 def _check_json(json):
1752 data = simplejson.loads(json)
1753 self.failUnless("lease-checker" in data)
1754 self.failUnless("lease-checker-progress" in data)
1755 d.addCallback(_check_json)
1758 def backdate_lease(self, sf, renew_secret, new_expire_time):
1759 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1760 # "renew" a lease with a new_expire_time that is older than what the
1761 # current lease has), so we have to reach inside it.
1762 for i,lease in enumerate(sf.get_leases()):
1763 if lease.renew_secret == renew_secret:
1764 lease.expiration_time = new_expire_time
1765 f = open(sf.home, 'rb+')
1766 sf._write_lease_record(f, i, lease)
1769 raise IndexError("unable to renew non-existent lease")
1771 def test_expire_age(self):
1772 basedir = "storage/LeaseCrawler/expire_age"
1773 fileutil.make_dirs(basedir)
1774 # setting expiration_time to 2000 means that any lease which is more
1775 # than 2000s old will be expired.
1776 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1777 expiration_enabled=True,
1778 expiration_mode="age",
1779 expiration_override_lease_duration=2000)
1780 # make it start sooner than usual.
1781 lc = ss.lease_checker
1783 lc.stop_after_first_bucket = True
1784 webstatus = StorageStatus(ss)
1786 # create a few shares, with some leases on them
1787 self.make_shares(ss)
1788 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1790 def count_shares(si):
1791 return len(list(ss._iter_share_files(si)))
1792 def _get_sharefile(si):
1793 return list(ss._iter_share_files(si))[0]
1794 def count_leases(si):
1795 return len(list(_get_sharefile(si).get_leases()))
1797 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1798 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1799 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1800 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1801 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1802 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1803 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1804 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1806 # artificially crank back the expiration time on the first lease of
1807 # each share, to make it look like it expired already (age=1000s).
1808 # Some shares have an extra lease which is set to expire at the
1809 # default time in 31 days from now (age=31days). We then run the
1810 # crawler, which will expire the first lease, making some shares get
1811 # deleted and others stay alive (with one remaining lease)
1814 sf0 = _get_sharefile(immutable_si_0)
1815 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1816 sf0_size = os.stat(sf0.home).st_size
1818 # immutable_si_1 gets an extra lease
1819 sf1 = _get_sharefile(immutable_si_1)
1820 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1822 sf2 = _get_sharefile(mutable_si_2)
1823 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1824 sf2_size = os.stat(sf2.home).st_size
1826 # mutable_si_3 gets an extra lease
1827 sf3 = _get_sharefile(mutable_si_3)
1828 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1830 ss.setServiceParent(self.s)
1832 d = fireEventually()
1833 # examine the state right after the first bucket has been processed
1834 def _after_first_bucket(ignored):
1835 p = lc.get_progress()
1836 self.failUnless(p["cycle-in-progress"])
1837 d.addCallback(_after_first_bucket)
1838 d.addCallback(lambda ign: self.render1(webstatus))
1839 def _check_html_in_cycle(html):
1840 s = remove_tags(html)
1841 # the first bucket encountered gets deleted, and its prefix
1842 # happens to be about 1/5th of the way through the ring, so the
1843 # predictor thinks we'll have 5 shares and that we'll delete them
1844 # all. This part of the test depends upon the SIs landing right
1845 # where they do now.
1846 self.failUnlessIn("The remainder of this cycle is expected to "
1847 "recover: 4 shares, 4 buckets", s)
1848 self.failUnlessIn("The whole cycle is expected to examine "
1849 "5 shares in 5 buckets and to recover: "
1850 "5 shares, 5 buckets", s)
1851 d.addCallback(_check_html_in_cycle)
1853 # wait for the crawler to finish the first cycle. Two shares should
1856 return bool(lc.get_state()["last-cycle-finished"] is not None)
1857 d.addCallback(lambda ign: self.poll(_wait))
1859 def _after_first_cycle(ignored):
1860 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1861 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1862 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1863 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1864 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1865 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1868 last = s["history"][0]
1870 self.failUnlessEqual(last["expiration-enabled"], True)
1871 self.failUnlessEqual(last["configured-expiration-mode"],
1872 ("age", 2000, None, ("mutable", "immutable")))
1873 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1875 rec = last["space-recovered"]
1876 self.failUnlessEqual(rec["examined-buckets"], 4)
1877 self.failUnlessEqual(rec["examined-shares"], 4)
1878 self.failUnlessEqual(rec["actual-buckets"], 2)
1879 self.failUnlessEqual(rec["original-buckets"], 2)
1880 self.failUnlessEqual(rec["configured-buckets"], 2)
1881 self.failUnlessEqual(rec["actual-shares"], 2)
1882 self.failUnlessEqual(rec["original-shares"], 2)
1883 self.failUnlessEqual(rec["configured-shares"], 2)
1884 size = sf0_size + sf2_size
1885 self.failUnlessEqual(rec["actual-sharebytes"], size)
1886 self.failUnlessEqual(rec["original-sharebytes"], size)
1887 self.failUnlessEqual(rec["configured-sharebytes"], size)
1888 # different platforms have different notions of "blocks used by
1889 # this file", so merely assert that it's a number
1890 self.failUnless(rec["actual-diskbytes"] >= 0,
1891 rec["actual-diskbytes"])
1892 self.failUnless(rec["original-diskbytes"] >= 0,
1893 rec["original-diskbytes"])
1894 self.failUnless(rec["configured-diskbytes"] >= 0,
1895 rec["configured-diskbytes"])
1896 d.addCallback(_after_first_cycle)
1897 d.addCallback(lambda ign: self.render1(webstatus))
1898 def _check_html(html):
1899 s = remove_tags(html)
1900 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1901 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1902 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1903 d.addCallback(_check_html)
1906 def test_expire_cutoff_date(self):
1907 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1908 fileutil.make_dirs(basedir)
1909 # setting cutoff-date to 2000 seconds ago means that any lease which
1910 # is more than 2000s old will be expired.
1912 then = int(now - 2000)
1913 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1914 expiration_enabled=True,
1915 expiration_mode="cutoff-date",
1916 expiration_cutoff_date=then)
1917 # make it start sooner than usual.
1918 lc = ss.lease_checker
1920 lc.stop_after_first_bucket = True
1921 webstatus = StorageStatus(ss)
1923 # create a few shares, with some leases on them
1924 self.make_shares(ss)
1925 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1927 def count_shares(si):
1928 return len(list(ss._iter_share_files(si)))
1929 def _get_sharefile(si):
1930 return list(ss._iter_share_files(si))[0]
1931 def count_leases(si):
1932 return len(list(_get_sharefile(si).get_leases()))
1934 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1935 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1936 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1937 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1938 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1939 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1940 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1941 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1943 # artificially crank back the expiration time on the first lease of
1944 # each share, to make it look like was renewed 3000s ago. To achieve
1945 # this, we need to set the expiration time to now-3000+31days. This
1946 # will change when the lease format is improved to contain both
1947 # create/renew time and duration.
1948 new_expiration_time = now - 3000 + 31*24*60*60
1950 # Some shares have an extra lease which is set to expire at the
1951 # default time in 31 days from now (age=31days). We then run the
1952 # crawler, which will expire the first lease, making some shares get
1953 # deleted and others stay alive (with one remaining lease)
1955 sf0 = _get_sharefile(immutable_si_0)
1956 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1957 sf0_size = os.stat(sf0.home).st_size
1959 # immutable_si_1 gets an extra lease
1960 sf1 = _get_sharefile(immutable_si_1)
1961 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1963 sf2 = _get_sharefile(mutable_si_2)
1964 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1965 sf2_size = os.stat(sf2.home).st_size
1967 # mutable_si_3 gets an extra lease
1968 sf3 = _get_sharefile(mutable_si_3)
1969 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1971 ss.setServiceParent(self.s)
1973 d = fireEventually()
1974 # examine the state right after the first bucket has been processed
1975 def _after_first_bucket(ignored):
1976 p = lc.get_progress()
1977 self.failUnless(p["cycle-in-progress"])
1978 d.addCallback(_after_first_bucket)
1979 d.addCallback(lambda ign: self.render1(webstatus))
1980 def _check_html_in_cycle(html):
1981 s = remove_tags(html)
1982 # the first bucket encountered gets deleted, and its prefix
1983 # happens to be about 1/5th of the way through the ring, so the
1984 # predictor thinks we'll have 5 shares and that we'll delete them
1985 # all. This part of the test depends upon the SIs landing right
1986 # where they do now.
1987 self.failUnlessIn("The remainder of this cycle is expected to "
1988 "recover: 4 shares, 4 buckets", s)
1989 self.failUnlessIn("The whole cycle is expected to examine "
1990 "5 shares in 5 buckets and to recover: "
1991 "5 shares, 5 buckets", s)
1992 d.addCallback(_check_html_in_cycle)
1994 # wait for the crawler to finish the first cycle. Two shares should
1997 return bool(lc.get_state()["last-cycle-finished"] is not None)
1998 d.addCallback(lambda ign: self.poll(_wait))
2000 def _after_first_cycle(ignored):
2001 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2002 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2003 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2004 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2005 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2006 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2009 last = s["history"][0]
2011 self.failUnlessEqual(last["expiration-enabled"], True)
2012 self.failUnlessEqual(last["configured-expiration-mode"],
2013 ("cutoff-date", None, then,
2014 ("mutable", "immutable")))
2015 self.failUnlessEqual(last["leases-per-share-histogram"],
2018 rec = last["space-recovered"]
2019 self.failUnlessEqual(rec["examined-buckets"], 4)
2020 self.failUnlessEqual(rec["examined-shares"], 4)
2021 self.failUnlessEqual(rec["actual-buckets"], 2)
2022 self.failUnlessEqual(rec["original-buckets"], 0)
2023 self.failUnlessEqual(rec["configured-buckets"], 2)
2024 self.failUnlessEqual(rec["actual-shares"], 2)
2025 self.failUnlessEqual(rec["original-shares"], 0)
2026 self.failUnlessEqual(rec["configured-shares"], 2)
2027 size = sf0_size + sf2_size
2028 self.failUnlessEqual(rec["actual-sharebytes"], size)
2029 self.failUnlessEqual(rec["original-sharebytes"], 0)
2030 self.failUnlessEqual(rec["configured-sharebytes"], size)
2031 # different platforms have different notions of "blocks used by
2032 # this file", so merely assert that it's a number
2033 self.failUnless(rec["actual-diskbytes"] >= 0,
2034 rec["actual-diskbytes"])
2035 self.failUnless(rec["original-diskbytes"] >= 0,
2036 rec["original-diskbytes"])
2037 self.failUnless(rec["configured-diskbytes"] >= 0,
2038 rec["configured-diskbytes"])
2039 d.addCallback(_after_first_cycle)
2040 d.addCallback(lambda ign: self.render1(webstatus))
2041 def _check_html(html):
2042 s = remove_tags(html)
2043 self.failUnlessIn("Expiration Enabled:"
2044 " expired leases will be removed", s)
2045 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2046 substr = "Leases created or last renewed before %s will be considered expired." % date
2047 self.failUnlessIn(substr, s)
2048 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2049 d.addCallback(_check_html)
2052 def test_only_immutable(self):
2053 basedir = "storage/LeaseCrawler/only_immutable"
2054 fileutil.make_dirs(basedir)
2056 then = int(now - 2000)
2057 ss = StorageServer(basedir, "\x00" * 20,
2058 expiration_enabled=True,
2059 expiration_mode="cutoff-date",
2060 expiration_cutoff_date=then,
2061 expiration_sharetypes=("immutable",))
2062 lc = ss.lease_checker
2064 webstatus = StorageStatus(ss)
2066 self.make_shares(ss)
2067 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2068 # set all leases to be expirable
2069 new_expiration_time = now - 3000 + 31*24*60*60
2071 def count_shares(si):
2072 return len(list(ss._iter_share_files(si)))
2073 def _get_sharefile(si):
2074 return list(ss._iter_share_files(si))[0]
2075 def count_leases(si):
2076 return len(list(_get_sharefile(si).get_leases()))
2078 sf0 = _get_sharefile(immutable_si_0)
2079 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2080 sf1 = _get_sharefile(immutable_si_1)
2081 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2082 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2083 sf2 = _get_sharefile(mutable_si_2)
2084 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2085 sf3 = _get_sharefile(mutable_si_3)
2086 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2087 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2089 ss.setServiceParent(self.s)
2091 return bool(lc.get_state()["last-cycle-finished"] is not None)
2092 d = self.poll(_wait)
2094 def _after_first_cycle(ignored):
2095 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2096 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2097 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2098 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2099 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2100 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2101 d.addCallback(_after_first_cycle)
2102 d.addCallback(lambda ign: self.render1(webstatus))
2103 def _check_html(html):
2104 s = remove_tags(html)
2105 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2106 d.addCallback(_check_html)
2109 def test_only_mutable(self):
2110 basedir = "storage/LeaseCrawler/only_mutable"
2111 fileutil.make_dirs(basedir)
2113 then = int(now - 2000)
2114 ss = StorageServer(basedir, "\x00" * 20,
2115 expiration_enabled=True,
2116 expiration_mode="cutoff-date",
2117 expiration_cutoff_date=then,
2118 expiration_sharetypes=("mutable",))
2119 lc = ss.lease_checker
2121 webstatus = StorageStatus(ss)
2123 self.make_shares(ss)
2124 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2125 # set all leases to be expirable
2126 new_expiration_time = now - 3000 + 31*24*60*60
2128 def count_shares(si):
2129 return len(list(ss._iter_share_files(si)))
2130 def _get_sharefile(si):
2131 return list(ss._iter_share_files(si))[0]
2132 def count_leases(si):
2133 return len(list(_get_sharefile(si).get_leases()))
2135 sf0 = _get_sharefile(immutable_si_0)
2136 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2137 sf1 = _get_sharefile(immutable_si_1)
2138 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2139 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2140 sf2 = _get_sharefile(mutable_si_2)
2141 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2142 sf3 = _get_sharefile(mutable_si_3)
2143 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2144 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2146 ss.setServiceParent(self.s)
2148 return bool(lc.get_state()["last-cycle-finished"] is not None)
2149 d = self.poll(_wait)
2151 def _after_first_cycle(ignored):
2152 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2153 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2154 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2155 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2156 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2157 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2158 d.addCallback(_after_first_cycle)
2159 d.addCallback(lambda ign: self.render1(webstatus))
2160 def _check_html(html):
2161 s = remove_tags(html)
2162 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2163 d.addCallback(_check_html)
2166 def test_bad_mode(self):
2167 basedir = "storage/LeaseCrawler/bad_mode"
2168 fileutil.make_dirs(basedir)
2169 e = self.failUnlessRaises(ValueError,
2170 StorageServer, basedir, "\x00" * 20,
2171 expiration_mode="bogus")
2172 self.failUnless("GC mode 'bogus' must be 'age' or 'cutoff-date'" in str(e), str(e))
2174 def test_parse_duration(self):
2178 p = time_format.parse_duration
2179 self.failUnlessEqual(p("7days"), 7*DAY)
2180 self.failUnlessEqual(p("31day"), 31*DAY)
2181 self.failUnlessEqual(p("60 days"), 60*DAY)
2182 self.failUnlessEqual(p("2mo"), 2*MONTH)
2183 self.failUnlessEqual(p("3 month"), 3*MONTH)
2184 self.failUnlessEqual(p("2years"), 2*YEAR)
2185 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2186 self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2189 def test_parse_date(self):
2190 p = time_format.parse_date
2191 self.failUnless(isinstance(p("2009-03-18"), int))
2192 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2194 def test_limited_history(self):
2195 basedir = "storage/LeaseCrawler/limited_history"
2196 fileutil.make_dirs(basedir)
2197 ss = StorageServer(basedir, "\x00" * 20)
2198 # make it start sooner than usual.
2199 lc = ss.lease_checker
2203 # create a few shares, with some leases on them
2204 self.make_shares(ss)
2206 ss.setServiceParent(self.s)
2208 def _wait_until_15_cycles_done():
2209 last = lc.state["last-cycle-finished"]
2210 if last is not None and last >= 15:
2215 d = self.poll(_wait_until_15_cycles_done)
2217 def _check(ignored):
2220 self.failUnlessEqual(len(h), 10)
2221 self.failUnlessEqual(max(h.keys()), 15)
2222 self.failUnlessEqual(min(h.keys()), 6)
2223 d.addCallback(_check)
2226 def test_unpredictable_future(self):
2227 basedir = "storage/LeaseCrawler/unpredictable_future"
2228 fileutil.make_dirs(basedir)
2229 ss = StorageServer(basedir, "\x00" * 20)
2230 # make it start sooner than usual.
2231 lc = ss.lease_checker
2233 lc.cpu_slice = -1.0 # stop quickly
2235 self.make_shares(ss)
2237 ss.setServiceParent(self.s)
2239 d = fireEventually()
2240 def _check(ignored):
2241 # this should fire after the first bucket is complete, but before
2242 # the first prefix is complete, so the progress-measurer won't
2243 # think we've gotten far enough to raise our percent-complete
2244 # above 0%, triggering the cannot-predict-the-future code in
2245 # expirer.py . This will have to change if/when the
2246 # progress-measurer gets smart enough to count buckets (we'll
2247 # have to interrupt it even earlier, before it's finished the
2250 self.failUnless("cycle-to-date" in s)
2251 self.failUnless("estimated-remaining-cycle" in s)
2252 self.failUnless("estimated-current-cycle" in s)
2254 left = s["estimated-remaining-cycle"]["space-recovered"]
2255 self.failUnlessEqual(left["actual-buckets"], None)
2256 self.failUnlessEqual(left["original-buckets"], None)
2257 self.failUnlessEqual(left["configured-buckets"], None)
2258 self.failUnlessEqual(left["actual-shares"], None)
2259 self.failUnlessEqual(left["original-shares"], None)
2260 self.failUnlessEqual(left["configured-shares"], None)
2261 self.failUnlessEqual(left["actual-diskbytes"], None)
2262 self.failUnlessEqual(left["original-diskbytes"], None)
2263 self.failUnlessEqual(left["configured-diskbytes"], None)
2264 self.failUnlessEqual(left["actual-sharebytes"], None)
2265 self.failUnlessEqual(left["original-sharebytes"], None)
2266 self.failUnlessEqual(left["configured-sharebytes"], None)
2268 full = s["estimated-remaining-cycle"]["space-recovered"]
2269 self.failUnlessEqual(full["actual-buckets"], None)
2270 self.failUnlessEqual(full["original-buckets"], None)
2271 self.failUnlessEqual(full["configured-buckets"], None)
2272 self.failUnlessEqual(full["actual-shares"], None)
2273 self.failUnlessEqual(full["original-shares"], None)
2274 self.failUnlessEqual(full["configured-shares"], None)
2275 self.failUnlessEqual(full["actual-diskbytes"], None)
2276 self.failUnlessEqual(full["original-diskbytes"], None)
2277 self.failUnlessEqual(full["configured-diskbytes"], None)
2278 self.failUnlessEqual(full["actual-sharebytes"], None)
2279 self.failUnlessEqual(full["original-sharebytes"], None)
2280 self.failUnlessEqual(full["configured-sharebytes"], None)
2282 d.addCallback(_check)
2285 def test_no_st_blocks(self):
2286 basedir = "storage/LeaseCrawler/no_st_blocks"
2287 fileutil.make_dirs(basedir)
2288 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2289 expiration_mode="age",
2290 expiration_override_lease_duration=-1000)
2291 # a negative expiration_time= means the "configured-"
2292 # space-recovered counts will be non-zero, since all shares will have
2295 # make it start sooner than usual.
2296 lc = ss.lease_checker
2299 self.make_shares(ss)
2300 ss.setServiceParent(self.s)
2302 return bool(lc.get_state()["last-cycle-finished"] is not None)
2303 d = self.poll(_wait)
2305 def _check(ignored):
2307 last = s["history"][0]
2308 rec = last["space-recovered"]
2309 self.failUnlessEqual(rec["configured-buckets"], 4)
2310 self.failUnlessEqual(rec["configured-shares"], 4)
2311 self.failUnless(rec["configured-sharebytes"] > 0,
2312 rec["configured-sharebytes"])
2313 # without the .st_blocks field in os.stat() results, we should be
2314 # reporting diskbytes==sharebytes
2315 self.failUnlessEqual(rec["configured-sharebytes"],
2316 rec["configured-diskbytes"])
2317 d.addCallback(_check)
2320 def test_share_corruption(self):
2321 self._poll_should_ignore_these_errors = [
2322 UnknownMutableContainerVersionError,
2323 UnknownImmutableContainerVersionError,
2325 basedir = "storage/LeaseCrawler/share_corruption"
2326 fileutil.make_dirs(basedir)
2327 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2328 w = StorageStatus(ss)
2329 # make it start sooner than usual.
2330 lc = ss.lease_checker
2331 lc.stop_after_first_bucket = True
2335 # create a few shares, with some leases on them
2336 self.make_shares(ss)
2338 # now corrupt one, and make sure the lease-checker keeps going
2339 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2340 first = min(self.sis)
2341 first_b32 = base32.b2a(first)
2342 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2345 f.write("BAD MAGIC")
2347 # if get_share_file() doesn't see the correct mutable magic, it
2348 # assumes the file is an immutable share, and then
2349 # immutable.ShareFile sees a bad version. So regardless of which kind
2350 # of share we corrupted, this will trigger an
2351 # UnknownImmutableContainerVersionError.
2353 # also create an empty bucket
2354 empty_si = base32.b2a("\x04"*16)
2355 empty_bucket_dir = os.path.join(ss.sharedir,
2356 storage_index_to_dir(empty_si))
2357 fileutil.make_dirs(empty_bucket_dir)
2359 ss.setServiceParent(self.s)
2361 d = fireEventually()
2363 # now examine the state right after the first bucket has been
2365 def _after_first_bucket(ignored):
2366 so_far = lc.get_state()["cycle-to-date"]
2367 rec = so_far["space-recovered"]
2368 self.failUnlessEqual(rec["examined-buckets"], 1)
2369 self.failUnlessEqual(rec["examined-shares"], 0)
2370 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2371 d.addCallback(_after_first_bucket)
2373 d.addCallback(lambda ign: self.render_json(w))
2374 def _check_json(json):
2375 data = simplejson.loads(json)
2376 # grr. json turns all dict keys into strings.
2377 so_far = data["lease-checker"]["cycle-to-date"]
2378 corrupt_shares = so_far["corrupt-shares"]
2379 # it also turns all tuples into lists
2380 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2381 d.addCallback(_check_json)
2382 d.addCallback(lambda ign: self.render1(w))
2383 def _check_html(html):
2384 s = remove_tags(html)
2385 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2386 d.addCallback(_check_html)
2389 return bool(lc.get_state()["last-cycle-finished"] is not None)
2390 d.addCallback(lambda ign: self.poll(_wait))
2392 def _after_first_cycle(ignored):
2394 last = s["history"][0]
2395 rec = last["space-recovered"]
2396 self.failUnlessEqual(rec["examined-buckets"], 5)
2397 self.failUnlessEqual(rec["examined-shares"], 3)
2398 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2399 d.addCallback(_after_first_cycle)
2400 d.addCallback(lambda ign: self.render_json(w))
2401 def _check_json_history(json):
2402 data = simplejson.loads(json)
2403 last = data["lease-checker"]["history"]["0"]
2404 corrupt_shares = last["corrupt-shares"]
2405 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2406 d.addCallback(_check_json_history)
2407 d.addCallback(lambda ign: self.render1(w))
2408 def _check_html_history(html):
2409 s = remove_tags(html)
2410 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2411 d.addCallback(_check_html_history)
2414 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2415 UnknownImmutableContainerVersionError)
2420 def render_json(self, page):
2421 d = self.render1(page, args={"t": ["json"]})
2424 class NoDiskStatsServer(StorageServer):
2425 def get_disk_stats(self):
2426 raise AttributeError
2428 class BadDiskStatsServer(StorageServer):
2429 def get_disk_stats(self):
2432 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2435 self.s = service.MultiService()
2436 self.s.startService()
2438 return self.s.stopService()
2440 def test_no_server(self):
2441 w = StorageStatus(None)
2442 html = w.renderSynchronously()
2443 self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2445 def test_status(self):
2446 basedir = "storage/WebStatus/status"
2447 fileutil.make_dirs(basedir)
2448 ss = StorageServer(basedir, "\x00" * 20)
2449 ss.setServiceParent(self.s)
2450 w = StorageStatus(ss)
2452 def _check_html(html):
2453 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2454 s = remove_tags(html)
2455 self.failUnless("Accepting new shares: Yes" in s, s)
2456 self.failUnless("Reserved space: - 0 B (0)" in s, s)
2457 d.addCallback(_check_html)
2458 d.addCallback(lambda ign: self.render_json(w))
2459 def _check_json(json):
2460 data = simplejson.loads(json)
2462 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2463 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2464 self.failUnless("bucket-counter" in data)
2465 self.failUnless("lease-checker" in data)
2466 d.addCallback(_check_json)
2469 def render_json(self, page):
2470 d = self.render1(page, args={"t": ["json"]})
2473 def test_status_no_disk_stats(self):
2474 # Some platforms may have no disk stats API. Make sure the code can handle that
2475 # (test runs on all platforms).
2476 basedir = "storage/WebStatus/status_no_disk_stats"
2477 fileutil.make_dirs(basedir)
2478 ss = NoDiskStatsServer(basedir, "\x00" * 20)
2479 ss.setServiceParent(self.s)
2480 w = StorageStatus(ss)
2481 html = w.renderSynchronously()
2482 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2483 s = remove_tags(html)
2484 self.failUnless("Accepting new shares: Yes" in s, s)
2485 self.failUnless("Total disk space: ?" in s, s)
2486 self.failUnless("Space Available to Tahoe: ?" in s, s)
2487 self.failUnless(ss.get_available_space() is None)
2489 def test_status_bad_disk_stats(self):
2490 # If the API to get disk stats exists but a call to it fails, then the status should
2491 # show that no shares will be accepted, and get_available_space() should be 0.
2492 basedir = "storage/WebStatus/status_bad_disk_stats"
2493 fileutil.make_dirs(basedir)
2494 ss = BadDiskStatsServer(basedir, "\x00" * 20)
2495 ss.setServiceParent(self.s)
2496 w = StorageStatus(ss)
2497 html = w.renderSynchronously()
2498 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2499 s = remove_tags(html)
2500 self.failUnless("Accepting new shares: No" in s, s)
2501 self.failUnless("Total disk space: ?" in s, s)
2502 self.failUnless("Space Available to Tahoe: ?" in s, s)
2503 self.failUnless(ss.get_available_space() == 0)
2505 def test_readonly(self):
2506 basedir = "storage/WebStatus/readonly"
2507 fileutil.make_dirs(basedir)
2508 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2509 ss.setServiceParent(self.s)
2510 w = StorageStatus(ss)
2511 html = w.renderSynchronously()
2512 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2513 s = remove_tags(html)
2514 self.failUnless("Accepting new shares: No" in s, s)
2516 def test_reserved(self):
2517 basedir = "storage/WebStatus/reserved"
2518 fileutil.make_dirs(basedir)
2519 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2520 ss.setServiceParent(self.s)
2521 w = StorageStatus(ss)
2522 html = w.renderSynchronously()
2523 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2524 s = remove_tags(html)
2525 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2527 def test_huge_reserved(self):
2528 basedir = "storage/WebStatus/reserved"
2529 fileutil.make_dirs(basedir)
2530 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2531 ss.setServiceParent(self.s)
2532 w = StorageStatus(ss)
2533 html = w.renderSynchronously()
2534 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2535 s = remove_tags(html)
2536 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2538 def test_util(self):
2539 w = StorageStatus(None)
2540 self.failUnlessEqual(w.render_space(None, None), "?")
2541 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2542 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2543 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2544 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2545 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)