2 import time, os.path, stat, re, simplejson, struct
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
30 def __init__(self, ignore_disconnectors=False):
31 self.ignore = ignore_disconnectors
32 self.disconnectors = {}
33 def notifyOnDisconnect(self, f, *args, **kwargs):
37 self.disconnectors[m] = (f, args, kwargs)
39 def dontNotifyOnDisconnect(self, marker):
42 del self.disconnectors[marker]
44 class FakeStatsProvider:
45 def count(self, name, delta=1):
47 def register_producer(self, producer):
50 class Bucket(unittest.TestCase):
51 def make_workdir(self, name):
52 basedir = os.path.join("storage", "Bucket", name)
53 incoming = os.path.join(basedir, "tmp", "bucket")
54 final = os.path.join(basedir, "bucket")
55 fileutil.make_dirs(basedir)
56 fileutil.make_dirs(os.path.join(basedir, "tmp"))
57 return incoming, final
59 def bucket_writer_closed(self, bw, consumed):
61 def add_latency(self, category, latency):
63 def count(self, name, delta=1):
68 renew_secret = os.urandom(32)
69 cancel_secret = os.urandom(32)
70 expiration_time = time.time() + 5000
71 return LeaseInfo(owner_num, renew_secret, cancel_secret,
72 expiration_time, "\x00" * 20)
74 def test_create(self):
75 incoming, final = self.make_workdir("test_create")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*25)
81 bw.remote_write(75, "d"*7)
84 def test_readwrite(self):
85 incoming, final = self.make_workdir("test_readwrite")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*7) # last block may be short
94 br = BucketReader(self, bw.finalhome)
95 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101 def callRemote(self, methname, *args, **kwargs):
103 meth = getattr(self.target, "remote_" + methname)
104 return meth(*args, **kwargs)
105 return defer.maybeDeferred(_call)
107 class BucketProxy(unittest.TestCase):
108 def make_bucket(self, name, size):
109 basedir = os.path.join("storage", "BucketProxy", name)
110 incoming = os.path.join(basedir, "tmp", "bucket")
111 final = os.path.join(basedir, "bucket")
112 fileutil.make_dirs(basedir)
113 fileutil.make_dirs(os.path.join(basedir, "tmp"))
114 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
120 def make_lease(self):
122 renew_secret = os.urandom(32)
123 cancel_secret = os.urandom(32)
124 expiration_time = time.time() + 5000
125 return LeaseInfo(owner_num, renew_secret, cancel_secret,
126 expiration_time, "\x00" * 20)
128 def bucket_writer_closed(self, bw, consumed):
130 def add_latency(self, category, latency):
132 def count(self, name, delta=1):
135 def test_create(self):
136 bw, rb, sharefname = self.make_bucket("test_create", 500)
137 bp = WriteBucketProxy(rb,
142 uri_extension_size_max=500, nodeid=None)
143 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
145 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146 # Let's pretend each share has 100 bytes of data, and that there are
147 # 4 segments (25 bytes each), and 8 shares total. So the two
148 # per-segment merkle trees (crypttext_hash_tree,
149 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152 # long. That should make the whole share:
154 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165 uri_extension = "s" + "E"*498 + "e"
167 bw, rb, sharefname = self.make_bucket(name, sharesize)
173 uri_extension_size_max=len(uri_extension),
177 d.addCallback(lambda res: bp.put_block(0, "a"*25))
178 d.addCallback(lambda res: bp.put_block(1, "b"*25))
179 d.addCallback(lambda res: bp.put_block(2, "c"*25))
180 d.addCallback(lambda res: bp.put_block(3, "d"*20))
181 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185 d.addCallback(lambda res: bp.close())
187 # now read everything back
188 def _start_reading(res):
189 br = BucketReader(self, sharefname)
192 rbp = rbp_class(rb, peerid="abc", storage_index="")
193 self.failUnlessIn("to peer", repr(rbp))
194 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
196 d1 = rbp.get_block_data(0, 25, 25)
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206 d1.addCallback(lambda res:
207 self.failUnlessEqual(res, crypttext_hashes))
208 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210 d1.addCallback(lambda res: rbp.get_share_hashes())
211 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212 d1.addCallback(lambda res: rbp.get_uri_extension())
213 d1.addCallback(lambda res:
214 self.failUnlessEqual(res, uri_extension))
218 d.addCallback(_start_reading)
222 def test_readwrite_v1(self):
223 return self._do_test_readwrite("test_readwrite_v1",
224 0x24, WriteBucketProxy, ReadBucketProxy)
226 def test_readwrite_v2(self):
227 return self._do_test_readwrite("test_readwrite_v2",
228 0x44, WriteBucketProxy_v2, ReadBucketProxy)
230 class FakeDiskStorageServer(StorageServer):
232 def get_disk_stats(self):
233 return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
235 class Server(unittest.TestCase):
238 self.sparent = LoggingServiceParent()
239 self.sparent.startService()
240 self._lease_secret = itertools.count()
242 return self.sparent.stopService()
244 def workdir(self, name):
245 basedir = os.path.join("storage", "Server", name)
248 def create(self, name, reserved_space=0, klass=StorageServer):
249 workdir = self.workdir(name)
250 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251 stats_provider=FakeStatsProvider())
252 ss.setServiceParent(self.sparent)
255 def test_create(self):
256 self.create("test_create")
258 def allocate(self, ss, storage_index, sharenums, size, canary=None):
259 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
262 canary = FakeCanary()
263 return ss.remote_allocate_buckets(storage_index,
264 renew_secret, cancel_secret,
265 sharenums, size, canary)
267 def test_large_share(self):
268 ss = self.create("test_large_share")
270 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271 self.failUnlessEqual(already, set())
272 self.failUnlessEqual(set(writers.keys()), set([0]))
274 shnum, bucket = writers.items()[0]
275 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
276 bucket.remote_write(2**32, "ab")
277 bucket.remote_close()
279 readers = ss.remote_get_buckets("allocate")
280 reader = readers[shnum]
281 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
284 def test_dont_overfill_dirs(self):
286 This test asserts that if you add a second share whose storage index
287 share lots of leading bits with an extant share (but isn't the exact
288 same storage index), this won't add an entry to the share directory.
290 ss = self.create("test_dont_overfill_dirs")
291 already, writers = self.allocate(ss, "storageindex", [0], 10)
292 for i, wb in writers.items():
293 wb.remote_write(0, "%10d" % i)
295 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
297 children_of_storedir = set(os.listdir(storedir))
299 # Now store another one under another storageindex that has leading
300 # chars the same as the first storageindex.
301 already, writers = self.allocate(ss, "storageindey", [0], 10)
302 for i, wb in writers.items():
303 wb.remote_write(0, "%10d" % i)
305 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
307 new_children_of_storedir = set(os.listdir(storedir))
308 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
310 def test_remove_incoming(self):
311 ss = self.create("test_remove_incoming")
312 already, writers = self.allocate(ss, "vid", range(3), 10)
313 for i,wb in writers.items():
314 wb.remote_write(0, "%10d" % i)
316 incoming_share_dir = wb.incominghome
317 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319 incoming_dir = os.path.dirname(incoming_prefix_dir)
320 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
321 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
322 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
324 def test_allocate(self):
325 ss = self.create("test_allocate")
327 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
329 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330 self.failUnlessEqual(already, set())
331 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
333 # while the buckets are open, they should not count as readable
334 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
337 for i,wb in writers.items():
338 wb.remote_write(0, "%25d" % i)
340 # aborting a bucket that was already closed is a no-op
343 # now they should be readable
344 b = ss.remote_get_buckets("allocate")
345 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
348 self.failUnlessIn("BucketReader", b_str)
349 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
351 # now if we ask about writing again, the server should offer those
352 # three buckets as already present. It should offer them even if we
353 # don't ask about those specific ones.
354 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355 self.failUnlessEqual(already, set([0,1,2]))
356 self.failUnlessEqual(set(writers.keys()), set([3,4]))
358 # while those two buckets are open for writing, the server should
359 # refuse to offer them to uploaders
361 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362 self.failUnlessEqual(already2, set([0,1,2]))
363 self.failUnlessEqual(set(writers2.keys()), set([5]))
365 # aborting the writes should remove the tempfiles
366 for i,wb in writers2.items():
368 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369 self.failUnlessEqual(already2, set([0,1,2]))
370 self.failUnlessEqual(set(writers2.keys()), set([5]))
372 for i,wb in writers2.items():
374 for i,wb in writers.items():
377 def test_bad_container_version(self):
378 ss = self.create("test_bad_container_version")
379 a,w = self.allocate(ss, "si1", [0], 10)
380 w[0].remote_write(0, "\xff"*10)
383 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
386 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
389 ss.remote_get_buckets("allocate")
391 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392 ss.remote_get_buckets, "si1")
393 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
395 def test_disconnect(self):
396 # simulate a disconnection
397 ss = self.create("test_disconnect")
398 canary = FakeCanary()
399 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400 self.failUnlessEqual(already, set())
401 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402 for (f,args,kwargs) in canary.disconnectors.values():
407 # that ought to delete the incoming shares
408 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409 self.failUnlessEqual(already, set())
410 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
412 def test_reserved_space(self):
413 ss = self.create("test_reserved_space", reserved_space=10000,
414 klass=FakeDiskStorageServer)
415 # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
417 # 15k available, 10k reserved, leaves 5k for shares
419 # a newly created and filled share incurs this much overhead, beyond
420 # the size we request.
422 LEASE_SIZE = 4+32+32+4
423 canary = FakeCanary(True)
424 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425 self.failUnlessEqual(len(writers), 3)
426 # now the StorageServer should have 3000 bytes provisionally
427 # allocated, allowing only 2000 more to be claimed
428 self.failUnlessEqual(len(ss._active_writers), 3)
430 # allocating 1001-byte shares only leaves room for one
431 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432 self.failUnlessEqual(len(writers2), 1)
433 self.failUnlessEqual(len(ss._active_writers), 4)
435 # we abandon the first set, so their provisional allocation should be
439 self.failUnlessEqual(len(ss._active_writers), 1)
440 # now we have a provisional allocation of 1001 bytes
442 # and we close the second set, so their provisional allocation should
443 # become real, long-term allocation, and grows to include the
445 for bw in writers2.values():
446 bw.remote_write(0, "a"*25)
451 self.failUnlessEqual(len(ss._active_writers), 0)
453 allocated = 1001 + OVERHEAD + LEASE_SIZE
455 # we have to manually increase DISKAVAIL, since we're not doing real
457 ss.DISKAVAIL -= allocated
459 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462 self.failUnlessEqual(len(writers3), 39)
463 self.failUnlessEqual(len(ss._active_writers), 39)
467 self.failUnlessEqual(len(ss._active_writers), 0)
468 ss.disownServiceParent()
471 def test_disk_stats(self):
472 # This will spuriously fail if there is zero disk space left (but so will other tests).
473 ss = self.create("test_disk_stats", reserved_space=0)
475 disk = ss.get_disk_stats()
476 self.failUnless(disk['total'] > 0, disk['total'])
477 self.failUnless(disk['used'] > 0, disk['used'])
478 self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
479 self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
480 self.failUnless(disk['avail'] > 0, disk['avail'])
482 def test_disk_stats_avail_nonnegative(self):
483 ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
485 disk = ss.get_disk_stats()
486 self.failUnlessEqual(disk['avail'], 0)
489 basedir = self.workdir("test_seek_behavior")
490 fileutil.make_dirs(basedir)
491 filename = os.path.join(basedir, "testfile")
492 f = open(filename, "wb")
495 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
496 # files. mode="a" preserves previous contents but does not allow
497 # seeking-to-create-holes. mode="r+" allows both.
498 f = open(filename, "rb+")
502 filelen = os.stat(filename)[stat.ST_SIZE]
503 self.failUnlessEqual(filelen, 100+3)
504 f2 = open(filename, "rb")
505 self.failUnlessEqual(f2.read(5), "start")
508 def test_leases(self):
509 ss = self.create("test_leases")
510 canary = FakeCanary()
514 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
515 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
516 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
517 sharenums, size, canary)
518 self.failUnlessEqual(len(already), 0)
519 self.failUnlessEqual(len(writers), 5)
520 for wb in writers.values():
523 leases = list(ss.get_leases("si0"))
524 self.failUnlessEqual(len(leases), 1)
525 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
527 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
528 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
529 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
530 sharenums, size, canary)
531 for wb in writers.values():
534 # take out a second lease on si1
535 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
538 sharenums, size, canary)
539 self.failUnlessEqual(len(already), 5)
540 self.failUnlessEqual(len(writers), 0)
542 leases = list(ss.get_leases("si1"))
543 self.failUnlessEqual(len(leases), 2)
544 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
546 # and a third lease, using add-lease
547 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
548 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
549 ss.remote_add_lease("si1", rs2a, cs2a)
550 leases = list(ss.get_leases("si1"))
551 self.failUnlessEqual(len(leases), 3)
552 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
554 # add-lease on a missing storage index is silently ignored
555 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
557 # check that si0 is readable
558 readers = ss.remote_get_buckets("si0")
559 self.failUnlessEqual(len(readers), 5)
561 # renew the first lease. Only the proper renew_secret should work
562 ss.remote_renew_lease("si0", rs0)
563 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
564 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
566 # check that si0 is still readable
567 readers = ss.remote_get_buckets("si0")
568 self.failUnlessEqual(len(readers), 5)
571 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
572 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
573 ss.remote_cancel_lease("si0", cs0)
575 # si0 should now be gone
576 readers = ss.remote_get_buckets("si0")
577 self.failUnlessEqual(len(readers), 0)
578 # and the renew should no longer work
579 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
582 # cancel the first lease on si1, leaving the second and third in place
583 ss.remote_cancel_lease("si1", cs1)
584 readers = ss.remote_get_buckets("si1")
585 self.failUnlessEqual(len(readers), 5)
586 # the corresponding renew should no longer work
587 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
589 leases = list(ss.get_leases("si1"))
590 self.failUnlessEqual(len(leases), 2)
591 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
593 ss.remote_renew_lease("si1", rs2)
594 # cancelling the second and third should make it go away
595 ss.remote_cancel_lease("si1", cs2)
596 ss.remote_cancel_lease("si1", cs2a)
597 readers = ss.remote_get_buckets("si1")
598 self.failUnlessEqual(len(readers), 0)
599 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
600 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
601 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
603 leases = list(ss.get_leases("si1"))
604 self.failUnlessEqual(len(leases), 0)
607 # test overlapping uploads
608 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
611 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
612 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
613 sharenums, size, canary)
614 self.failUnlessEqual(len(already), 0)
615 self.failUnlessEqual(len(writers), 5)
616 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
617 sharenums, size, canary)
618 self.failUnlessEqual(len(already2), 0)
619 self.failUnlessEqual(len(writers2), 0)
620 for wb in writers.values():
623 leases = list(ss.get_leases("si3"))
624 self.failUnlessEqual(len(leases), 1)
626 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
627 sharenums, size, canary)
628 self.failUnlessEqual(len(already3), 5)
629 self.failUnlessEqual(len(writers3), 0)
631 leases = list(ss.get_leases("si3"))
632 self.failUnlessEqual(len(leases), 2)
634 def test_readonly(self):
635 workdir = self.workdir("test_readonly")
636 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
637 ss.setServiceParent(self.sparent)
639 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
640 self.failUnlessEqual(already, set())
641 self.failUnlessEqual(writers, {})
643 stats = ss.get_stats()
644 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
645 if "storage_server.disk_avail" in stats:
646 # Some platforms may not have an API to get disk stats.
647 # But if there are stats, readonly_storage means disk_avail=0
648 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
650 def test_discard(self):
651 # discard is really only used for other tests, but we test it anyways
652 workdir = self.workdir("test_discard")
653 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
654 ss.setServiceParent(self.sparent)
656 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
657 self.failUnlessEqual(already, set())
658 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
659 for i,wb in writers.items():
660 wb.remote_write(0, "%25d" % i)
662 # since we discard the data, the shares should be present but sparse.
663 # Since we write with some seeks, the data we read back will be all
665 b = ss.remote_get_buckets("vid")
666 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
667 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
669 def test_advise_corruption(self):
670 workdir = self.workdir("test_advise_corruption")
671 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
672 ss.setServiceParent(self.sparent)
674 si0_s = base32.b2a("si0")
675 ss.remote_advise_corrupt_share("immutable", "si0", 0,
676 "This share smells funny.\n")
677 reportdir = os.path.join(workdir, "corruption-advisories")
678 reports = os.listdir(reportdir)
679 self.failUnlessEqual(len(reports), 1)
680 report_si0 = reports[0]
681 self.failUnlessIn(si0_s, report_si0)
682 f = open(os.path.join(reportdir, report_si0), "r")
685 self.failUnlessIn("type: immutable", report)
686 self.failUnlessIn("storage_index: %s" % si0_s, report)
687 self.failUnlessIn("share_number: 0", report)
688 self.failUnlessIn("This share smells funny.", report)
690 # test the RIBucketWriter version too
691 si1_s = base32.b2a("si1")
692 already,writers = self.allocate(ss, "si1", [1], 75)
693 self.failUnlessEqual(already, set())
694 self.failUnlessEqual(set(writers.keys()), set([1]))
695 writers[1].remote_write(0, "data")
696 writers[1].remote_close()
698 b = ss.remote_get_buckets("si1")
699 self.failUnlessEqual(set(b.keys()), set([1]))
700 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
702 reports = os.listdir(reportdir)
703 self.failUnlessEqual(len(reports), 2)
704 report_si1 = [r for r in reports if si1_s in r][0]
705 f = open(os.path.join(reportdir, report_si1), "r")
708 self.failUnlessIn("type: immutable", report)
709 self.failUnlessIn("storage_index: %s" % si1_s, report)
710 self.failUnlessIn("share_number: 1", report)
711 self.failUnlessIn("This share tastes like dust.", report)
715 class MutableServer(unittest.TestCase):
718 self.sparent = LoggingServiceParent()
719 self._lease_secret = itertools.count()
721 return self.sparent.stopService()
723 def workdir(self, name):
724 basedir = os.path.join("storage", "MutableServer", name)
727 def create(self, name):
728 workdir = self.workdir(name)
729 ss = StorageServer(workdir, "\x00" * 20)
730 ss.setServiceParent(self.sparent)
733 def test_create(self):
734 self.create("test_create")
736 def write_enabler(self, we_tag):
737 return hashutil.tagged_hash("we_blah", we_tag)
739 def renew_secret(self, tag):
740 return hashutil.tagged_hash("renew_blah", str(tag))
742 def cancel_secret(self, tag):
743 return hashutil.tagged_hash("cancel_blah", str(tag))
745 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
746 write_enabler = self.write_enabler(we_tag)
747 renew_secret = self.renew_secret(lease_tag)
748 cancel_secret = self.cancel_secret(lease_tag)
749 rstaraw = ss.remote_slot_testv_and_readv_and_writev
750 testandwritev = dict( [ (shnum, ([], [], None) )
751 for shnum in sharenums ] )
753 rc = rstaraw(storage_index,
754 (write_enabler, renew_secret, cancel_secret),
757 (did_write, readv_data) = rc
758 self.failUnless(did_write)
759 self.failUnless(isinstance(readv_data, dict))
760 self.failUnlessEqual(len(readv_data), 0)
762 def test_bad_magic(self):
763 ss = self.create("test_bad_magic")
764 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
765 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
770 read = ss.remote_slot_readv
771 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
772 read, "si1", [0], [(0,10)])
773 self.failUnlessIn(" had magic ", str(e))
774 self.failUnlessIn(" but we wanted ", str(e))
776 def test_container_size(self):
777 ss = self.create("test_container_size")
778 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
780 read = ss.remote_slot_readv
781 rstaraw = ss.remote_slot_testv_and_readv_and_writev
782 secrets = ( self.write_enabler("we1"),
783 self.renew_secret("we1"),
784 self.cancel_secret("we1") )
785 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
786 answer = rstaraw("si1", secrets,
787 {0: ([], [(0,data)], len(data)+12)},
789 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
791 # trying to make the container too large will raise an exception
792 TOOBIG = MutableShareFile.MAX_SIZE + 10
793 self.failUnlessRaises(DataTooLargeError,
794 rstaraw, "si1", secrets,
795 {0: ([], [(0,data)], TOOBIG)},
798 # it should be possible to make the container smaller, although at
799 # the moment this doesn't actually affect the share, unless the
800 # container size is dropped to zero, in which case the share is
802 answer = rstaraw("si1", secrets,
803 {0: ([], [(0,data)], len(data)+8)},
805 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
807 answer = rstaraw("si1", secrets,
808 {0: ([], [(0,data)], 0)},
810 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
812 read_answer = read("si1", [0], [(0,10)])
813 self.failUnlessEqual(read_answer, {})
815 def test_allocate(self):
816 ss = self.create("test_allocate")
817 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
820 read = ss.remote_slot_readv
821 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
823 self.failUnlessEqual(read("si1", [], [(0, 10)]),
824 {0: [""], 1: [""], 2: [""]})
825 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
829 secrets = ( self.write_enabler("we1"),
830 self.renew_secret("we1"),
831 self.cancel_secret("we1") )
832 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
833 write = ss.remote_slot_testv_and_readv_and_writev
834 answer = write("si1", secrets,
835 {0: ([], [(0,data)], None)},
837 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
839 self.failUnlessEqual(read("si1", [0], [(0,20)]),
840 {0: ["00000000001111111111"]})
841 self.failUnlessEqual(read("si1", [0], [(95,10)]),
843 #self.failUnlessEqual(s0.remote_get_length(), 100)
845 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
846 f = self.failUnlessRaises(BadWriteEnablerError,
847 write, "si1", bad_secrets,
849 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
851 # this testv should fail
852 answer = write("si1", secrets,
853 {0: ([(0, 12, "eq", "444444444444"),
854 (20, 5, "eq", "22222"),
861 self.failUnlessEqual(answer, (False,
862 {0: ["000000000011", "22222"],
866 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
869 answer = write("si1", secrets,
870 {0: ([(10, 5, "lt", "11111"),
877 self.failUnlessEqual(answer, (False,
882 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
885 def test_operators(self):
886 # test operators, the data we're comparing is '11111' in all cases.
887 # test both fail+pass, reset data after each one.
888 ss = self.create("test_operators")
890 secrets = ( self.write_enabler("we1"),
891 self.renew_secret("we1"),
892 self.cancel_secret("we1") )
893 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
894 write = ss.remote_slot_testv_and_readv_and_writev
895 read = ss.remote_slot_readv
898 write("si1", secrets,
899 {0: ([], [(0,data)], None)},
905 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
910 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
911 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
912 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
915 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
920 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
921 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
924 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
929 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
930 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
934 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
939 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
940 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
943 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
948 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
949 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
952 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
957 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
962 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
967 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
968 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
971 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
976 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
977 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
981 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
986 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
987 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
990 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
995 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
996 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1000 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1005 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1006 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1009 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1014 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1018 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1023 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1024 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1028 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1033 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1034 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1037 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1042 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1043 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1046 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1051 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1055 # finally, test some operators against empty shares
1056 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1061 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1062 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1065 def test_readv(self):
1066 ss = self.create("test_readv")
1067 secrets = ( self.write_enabler("we1"),
1068 self.renew_secret("we1"),
1069 self.cancel_secret("we1") )
1070 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1071 write = ss.remote_slot_testv_and_readv_and_writev
1072 read = ss.remote_slot_readv
1073 data = [("%d" % i) * 100 for i in range(3)]
1074 rc = write("si1", secrets,
1075 {0: ([], [(0,data[0])], None),
1076 1: ([], [(0,data[1])], None),
1077 2: ([], [(0,data[2])], None),
1079 self.failUnlessEqual(rc, (True, {}))
1081 answer = read("si1", [], [(0, 10)])
1082 self.failUnlessEqual(answer, {0: ["0"*10],
1086 def compare_leases_without_timestamps(self, leases_a, leases_b):
1087 self.failUnlessEqual(len(leases_a), len(leases_b))
1088 for i in range(len(leases_a)):
1091 self.failUnlessEqual(a.owner_num, b.owner_num)
1092 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1093 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1094 self.failUnlessEqual(a.nodeid, b.nodeid)
1096 def compare_leases(self, leases_a, leases_b):
1097 self.failUnlessEqual(len(leases_a), len(leases_b))
1098 for i in range(len(leases_a)):
1101 self.failUnlessEqual(a.owner_num, b.owner_num)
1102 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1103 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1104 self.failUnlessEqual(a.nodeid, b.nodeid)
1105 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1107 def test_leases(self):
1108 ss = self.create("test_leases")
1110 return ( self.write_enabler("we1"),
1111 self.renew_secret("we1-%d" % n),
1112 self.cancel_secret("we1-%d" % n) )
1113 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1114 write = ss.remote_slot_testv_and_readv_and_writev
1115 read = ss.remote_slot_readv
1116 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1117 self.failUnlessEqual(rc, (True, {}))
1119 # create a random non-numeric file in the bucket directory, to
1120 # exercise the code that's supposed to ignore those.
1121 bucket_dir = os.path.join(self.workdir("test_leases"),
1122 "shares", storage_index_to_dir("si1"))
1123 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1124 f.write("you ought to be ignoring me\n")
1127 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1128 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1130 # add-lease on a missing storage index is silently ignored
1131 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1133 # re-allocate the slots and use the same secrets, that should update
1135 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1136 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1139 ss.remote_renew_lease("si1", secrets(0)[1])
1140 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1142 # now allocate them with a bunch of different secrets, to trigger the
1143 # extended lease code. Use add_lease for one of them.
1144 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1145 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1146 secrets2 = secrets(2)
1147 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1148 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1149 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1150 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1151 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1153 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1155 # cancel one of them
1156 ss.remote_cancel_lease("si1", secrets(5)[2])
1157 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1159 all_leases = list(s0.get_leases())
1160 # and write enough data to expand the container, forcing the server
1161 # to move the leases
1162 write("si1", secrets(0),
1163 {0: ([], [(0,data)], 200), },
1166 # read back the leases, make sure they're still intact.
1167 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1169 ss.remote_renew_lease("si1", secrets(0)[1])
1170 ss.remote_renew_lease("si1", secrets(1)[1])
1171 ss.remote_renew_lease("si1", secrets(2)[1])
1172 ss.remote_renew_lease("si1", secrets(3)[1])
1173 ss.remote_renew_lease("si1", secrets(4)[1])
1174 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1175 # get a new copy of the leases, with the current timestamps. Reading
1176 # data and failing to renew/cancel leases should leave the timestamps
1178 all_leases = list(s0.get_leases())
1179 # renewing with a bogus token should prompt an error message
1181 # examine the exception thus raised, make sure the old nodeid is
1182 # present, to provide for share migration
1183 e = self.failUnlessRaises(IndexError,
1184 ss.remote_renew_lease, "si1",
1187 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1188 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1189 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1191 # same for cancelling
1192 self.failUnlessRaises(IndexError,
1193 ss.remote_cancel_lease, "si1",
1195 self.compare_leases(all_leases, list(s0.get_leases()))
1197 # reading shares should not modify the timestamp
1198 read("si1", [], [(0,200)])
1199 self.compare_leases(all_leases, list(s0.get_leases()))
1201 write("si1", secrets(0),
1202 {0: ([], [(200, "make me bigger")], None)}, [])
1203 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1205 write("si1", secrets(0),
1206 {0: ([], [(500, "make me really bigger")], None)}, [])
1207 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1209 # now cancel them all
1210 ss.remote_cancel_lease("si1", secrets(0)[2])
1211 ss.remote_cancel_lease("si1", secrets(1)[2])
1212 ss.remote_cancel_lease("si1", secrets(2)[2])
1213 ss.remote_cancel_lease("si1", secrets(3)[2])
1215 # the slot should still be there
1216 remaining_shares = read("si1", [], [(0,10)])
1217 self.failUnlessEqual(len(remaining_shares), 1)
1218 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1220 # cancelling a non-existent lease should raise an IndexError
1221 self.failUnlessRaises(IndexError,
1222 ss.remote_cancel_lease, "si1", "nonsecret")
1224 # and the slot should still be there
1225 remaining_shares = read("si1", [], [(0,10)])
1226 self.failUnlessEqual(len(remaining_shares), 1)
1227 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1229 ss.remote_cancel_lease("si1", secrets(4)[2])
1230 # now the slot should be gone
1231 no_shares = read("si1", [], [(0,10)])
1232 self.failUnlessEqual(no_shares, {})
1234 # cancelling a lease on a non-existent share should raise an IndexError
1235 self.failUnlessRaises(IndexError,
1236 ss.remote_cancel_lease, "si2", "nonsecret")
1238 def test_remove(self):
1239 ss = self.create("test_remove")
1240 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1242 readv = ss.remote_slot_readv
1243 writev = ss.remote_slot_testv_and_readv_and_writev
1244 secrets = ( self.write_enabler("we1"),
1245 self.renew_secret("we1"),
1246 self.cancel_secret("we1") )
1247 # delete sh0 by setting its size to zero
1248 answer = writev("si1", secrets,
1251 # the answer should mention all the shares that existed before the
1253 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1254 # but a new read should show only sh1 and sh2
1255 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1258 # delete sh1 by setting its size to zero
1259 answer = writev("si1", secrets,
1262 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1263 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1266 # delete sh2 by setting its size to zero
1267 answer = writev("si1", secrets,
1270 self.failUnlessEqual(answer, (True, {2:[]}) )
1271 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1273 # and the bucket directory should now be gone
1274 si = base32.b2a("si1")
1275 # note: this is a detail of the storage server implementation, and
1276 # may change in the future
1278 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1279 bucketdir = os.path.join(prefixdir, si)
1280 self.failUnless(os.path.exists(prefixdir), prefixdir)
1281 self.failIf(os.path.exists(bucketdir), bucketdir)
1283 class Stats(unittest.TestCase):
1286 self.sparent = LoggingServiceParent()
1287 self._lease_secret = itertools.count()
1289 return self.sparent.stopService()
1291 def workdir(self, name):
1292 basedir = os.path.join("storage", "Server", name)
1295 def create(self, name):
1296 workdir = self.workdir(name)
1297 ss = StorageServer(workdir, "\x00" * 20)
1298 ss.setServiceParent(self.sparent)
1301 def test_latencies(self):
1302 ss = self.create("test_latencies")
1303 for i in range(10000):
1304 ss.add_latency("allocate", 1.0 * i)
1305 for i in range(1000):
1306 ss.add_latency("renew", 1.0 * i)
1308 ss.add_latency("cancel", 2.0 * i)
1309 ss.add_latency("get", 5.0)
1311 output = ss.get_latencies()
1313 self.failUnlessEqual(sorted(output.keys()),
1314 sorted(["allocate", "renew", "cancel", "get"]))
1315 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1316 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1317 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1318 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1319 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1320 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1321 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1322 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1323 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1325 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1326 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1327 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
1328 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1329 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1330 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1331 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1332 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1333 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1335 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1336 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1337 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1, output)
1338 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
1339 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1340 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1341 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1342 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1343 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1345 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1346 self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1347 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1348 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1349 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1350 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1351 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1352 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1353 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1356 s = re.sub(r'<[^>]*>', ' ', s)
1357 s = re.sub(r'\s+', ' ', s)
1360 class MyBucketCountingCrawler(BucketCountingCrawler):
1361 def finished_prefix(self, cycle, prefix):
1362 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1364 d = self.hook_ds.pop(0)
1367 class MyStorageServer(StorageServer):
1368 def add_bucket_counter(self):
1369 statefile = os.path.join(self.storedir, "bucket_counter.state")
1370 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1371 self.bucket_counter.setServiceParent(self)
1373 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1376 self.s = service.MultiService()
1377 self.s.startService()
1379 return self.s.stopService()
1381 def test_bucket_counter(self):
1382 basedir = "storage/BucketCounter/bucket_counter"
1383 fileutil.make_dirs(basedir)
1384 ss = StorageServer(basedir, "\x00" * 20)
1385 # to make sure we capture the bucket-counting-crawler in the middle
1386 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1387 # also make it start sooner than usual.
1388 ss.bucket_counter.slow_start = 0
1389 orig_cpu_slice = ss.bucket_counter.cpu_slice
1390 ss.bucket_counter.cpu_slice = 0
1391 ss.setServiceParent(self.s)
1393 w = StorageStatus(ss)
1395 # this sample is before the crawler has started doing anything
1396 html = w.renderSynchronously()
1397 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1398 s = remove_tags(html)
1399 self.failUnlessIn("Accepting new shares: Yes", s)
1400 self.failUnlessIn("Reserved space: - 0 B (0)", s)
1401 self.failUnlessIn("Total buckets: Not computed yet", s)
1402 self.failUnlessIn("Next crawl in", s)
1404 # give the bucket-counting-crawler one tick to get started. The
1405 # cpu_slice=0 will force it to yield right after it processes the
1408 d = fireEventually()
1409 def _check(ignored):
1410 # are we really right after the first prefix?
1411 state = ss.bucket_counter.get_state()
1412 if state["last-complete-prefix"] is None:
1413 d2 = fireEventually()
1414 d2.addCallback(_check)
1416 self.failUnlessEqual(state["last-complete-prefix"],
1417 ss.bucket_counter.prefixes[0])
1418 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1419 html = w.renderSynchronously()
1420 s = remove_tags(html)
1421 self.failUnlessIn(" Current crawl ", s)
1422 self.failUnlessIn(" (next work in ", s)
1423 d.addCallback(_check)
1425 # now give it enough time to complete a full cycle
1427 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1428 d.addCallback(lambda ignored: self.poll(_watch))
1429 def _check2(ignored):
1430 ss.bucket_counter.cpu_slice = orig_cpu_slice
1431 html = w.renderSynchronously()
1432 s = remove_tags(html)
1433 self.failUnlessIn("Total buckets: 0 (the number of", s)
1434 self.failUnlessIn("Next crawl in 59 minutes", s)
1435 d.addCallback(_check2)
1438 def test_bucket_counter_cleanup(self):
1439 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1440 fileutil.make_dirs(basedir)
1441 ss = StorageServer(basedir, "\x00" * 20)
1442 # to make sure we capture the bucket-counting-crawler in the middle
1443 # of a cycle, we reach in and reduce its maximum slice time to 0.
1444 ss.bucket_counter.slow_start = 0
1445 orig_cpu_slice = ss.bucket_counter.cpu_slice
1446 ss.bucket_counter.cpu_slice = 0
1447 ss.setServiceParent(self.s)
1449 d = fireEventually()
1451 def _after_first_prefix(ignored):
1452 state = ss.bucket_counter.state
1453 if state["last-complete-prefix"] is None:
1454 d2 = fireEventually()
1455 d2.addCallback(_after_first_prefix)
1457 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1458 # now sneak in and mess with its state, to make sure it cleans up
1459 # properly at the end of the cycle
1460 self.failUnlessEqual(state["last-complete-prefix"],
1461 ss.bucket_counter.prefixes[0])
1462 state["bucket-counts"][-12] = {}
1463 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1464 ss.bucket_counter.save_state()
1465 d.addCallback(_after_first_prefix)
1467 # now give it enough time to complete a cycle
1469 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1470 d.addCallback(lambda ignored: self.poll(_watch))
1471 def _check2(ignored):
1472 ss.bucket_counter.cpu_slice = orig_cpu_slice
1473 s = ss.bucket_counter.get_state()
1474 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1475 self.failIf("bogusprefix!" in s["storage-index-samples"],
1476 s["storage-index-samples"].keys())
1477 d.addCallback(_check2)
1480 def test_bucket_counter_eta(self):
1481 basedir = "storage/BucketCounter/bucket_counter_eta"
1482 fileutil.make_dirs(basedir)
1483 ss = MyStorageServer(basedir, "\x00" * 20)
1484 ss.bucket_counter.slow_start = 0
1485 # these will be fired inside finished_prefix()
1486 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1487 w = StorageStatus(ss)
1489 d = defer.Deferred()
1491 def _check_1(ignored):
1492 # no ETA is available yet
1493 html = w.renderSynchronously()
1494 s = remove_tags(html)
1495 self.failUnlessIn("complete (next work", s)
1497 def _check_2(ignored):
1498 # one prefix has finished, so an ETA based upon that elapsed time
1499 # should be available.
1500 html = w.renderSynchronously()
1501 s = remove_tags(html)
1502 self.failUnlessIn("complete (ETA ", s)
1504 def _check_3(ignored):
1505 # two prefixes have finished
1506 html = w.renderSynchronously()
1507 s = remove_tags(html)
1508 self.failUnlessIn("complete (ETA ", s)
1511 hooks[0].addCallback(_check_1).addErrback(d.errback)
1512 hooks[1].addCallback(_check_2).addErrback(d.errback)
1513 hooks[2].addCallback(_check_3).addErrback(d.errback)
1515 ss.setServiceParent(self.s)
1518 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1519 stop_after_first_bucket = False
1520 def process_bucket(self, *args, **kwargs):
1521 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1522 if self.stop_after_first_bucket:
1523 self.stop_after_first_bucket = False
1524 self.cpu_slice = -1.0
1525 def yielding(self, sleep_time):
1526 if not self.stop_after_first_bucket:
1527 self.cpu_slice = 500
1529 class BrokenStatResults:
1531 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1534 bsr = BrokenStatResults()
1535 for attrname in dir(s):
1536 if attrname.startswith("_"):
1538 if attrname == "st_blocks":
1540 setattr(bsr, attrname, getattr(s, attrname))
1543 class InstrumentedStorageServer(StorageServer):
1544 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1545 class No_ST_BLOCKS_StorageServer(StorageServer):
1546 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1548 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1551 self.s = service.MultiService()
1552 self.s.startService()
1554 return self.s.stopService()
1556 def make_shares(self, ss):
1558 return (si, hashutil.tagged_hash("renew", si),
1559 hashutil.tagged_hash("cancel", si))
1560 def make_mutable(si):
1561 return (si, hashutil.tagged_hash("renew", si),
1562 hashutil.tagged_hash("cancel", si),
1563 hashutil.tagged_hash("write-enabler", si))
1564 def make_extra_lease(si, num):
1565 return (hashutil.tagged_hash("renew-%d" % num, si),
1566 hashutil.tagged_hash("cancel-%d" % num, si))
1568 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1569 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1570 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1571 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1572 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1573 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1575 canary = FakeCanary()
1576 # note: 'tahoe debug dump-share' will not handle this file, since the
1577 # inner contents are not a valid CHK share
1578 data = "\xff" * 1000
1580 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1582 w[0].remote_write(0, data)
1585 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1587 w[0].remote_write(0, data)
1589 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1591 writev = ss.remote_slot_testv_and_readv_and_writev
1592 writev(mutable_si_2, (we2, rs2, cs2),
1593 {0: ([], [(0,data)], len(data))}, [])
1594 writev(mutable_si_3, (we3, rs3, cs3),
1595 {0: ([], [(0,data)], len(data))}, [])
1596 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1598 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1599 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1600 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1602 def test_basic(self):
1603 basedir = "storage/LeaseCrawler/basic"
1604 fileutil.make_dirs(basedir)
1605 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1606 # make it start sooner than usual.
1607 lc = ss.lease_checker
1610 lc.stop_after_first_bucket = True
1611 webstatus = StorageStatus(ss)
1613 # create a few shares, with some leases on them
1614 self.make_shares(ss)
1615 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1617 # add a non-sharefile to exercise another code path
1618 fn = os.path.join(ss.sharedir,
1619 storage_index_to_dir(immutable_si_0),
1622 f.write("I am not a share.\n")
1625 # this is before the crawl has started, so we're not in a cycle yet
1626 initial_state = lc.get_state()
1627 self.failIf(lc.get_progress()["cycle-in-progress"])
1628 self.failIfIn("cycle-to-date", initial_state)
1629 self.failIfIn("estimated-remaining-cycle", initial_state)
1630 self.failIfIn("estimated-current-cycle", initial_state)
1631 self.failUnlessIn("history", initial_state)
1632 self.failUnlessEqual(initial_state["history"], {})
1634 ss.setServiceParent(self.s)
1638 d = fireEventually()
1640 # now examine the state right after the first bucket has been
1642 def _after_first_bucket(ignored):
1643 initial_state = lc.get_state()
1644 if "cycle-to-date" not in initial_state:
1645 d2 = fireEventually()
1646 d2.addCallback(_after_first_bucket)
1648 self.failUnlessIn("cycle-to-date", initial_state)
1649 self.failUnlessIn("estimated-remaining-cycle", initial_state)
1650 self.failUnlessIn("estimated-current-cycle", initial_state)
1651 self.failUnlessIn("history", initial_state)
1652 self.failUnlessEqual(initial_state["history"], {})
1654 so_far = initial_state["cycle-to-date"]
1655 self.failUnlessEqual(so_far["expiration-enabled"], False)
1656 self.failUnlessIn("configured-expiration-mode", so_far)
1657 self.failUnlessIn("lease-age-histogram", so_far)
1658 lah = so_far["lease-age-histogram"]
1659 self.failUnlessEqual(type(lah), list)
1660 self.failUnlessEqual(len(lah), 1)
1661 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1662 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1663 self.failUnlessEqual(so_far["corrupt-shares"], [])
1664 sr1 = so_far["space-recovered"]
1665 self.failUnlessEqual(sr1["examined-buckets"], 1)
1666 self.failUnlessEqual(sr1["examined-shares"], 1)
1667 self.failUnlessEqual(sr1["actual-shares"], 0)
1668 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1669 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1670 left = initial_state["estimated-remaining-cycle"]
1671 sr2 = left["space-recovered"]
1672 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1673 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1674 self.failIfEqual(sr2["actual-shares"], None)
1675 self.failIfEqual(sr2["configured-diskbytes"], None)
1676 self.failIfEqual(sr2["original-sharebytes"], None)
1677 d.addCallback(_after_first_bucket)
1678 d.addCallback(lambda ign: self.render1(webstatus))
1679 def _check_html_in_cycle(html):
1680 s = remove_tags(html)
1681 self.failUnlessIn("So far, this cycle has examined "
1682 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1683 self.failUnlessIn("and has recovered: "
1684 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1685 "0 B (0 B / 0 B)", s)
1686 self.failUnlessIn("If expiration were enabled, "
1687 "we would have recovered: "
1688 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1689 " 0 B (0 B / 0 B) by now", s)
1690 self.failUnlessIn("and the remainder of this cycle "
1691 "would probably recover: "
1692 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1693 " 0 B (0 B / 0 B)", s)
1694 self.failUnlessIn("and the whole cycle would probably recover: "
1695 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1696 " 0 B (0 B / 0 B)", s)
1697 self.failUnlessIn("if we were strictly using each lease's default "
1698 "31-day lease lifetime", s)
1699 self.failUnlessIn("this cycle would be expected to recover: ", s)
1700 d.addCallback(_check_html_in_cycle)
1702 # wait for the crawler to finish the first cycle. Nothing should have
1705 return bool(lc.get_state()["last-cycle-finished"] is not None)
1706 d.addCallback(lambda ign: self.poll(_wait))
1708 def _after_first_cycle(ignored):
1710 self.failIf("cycle-to-date" in s)
1711 self.failIf("estimated-remaining-cycle" in s)
1712 self.failIf("estimated-current-cycle" in s)
1713 last = s["history"][0]
1714 self.failUnlessIn("cycle-start-finish-times", last)
1715 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1716 self.failUnlessEqual(last["expiration-enabled"], False)
1717 self.failUnlessIn("configured-expiration-mode", last)
1719 self.failUnlessIn("lease-age-histogram", last)
1720 lah = last["lease-age-histogram"]
1721 self.failUnlessEqual(type(lah), list)
1722 self.failUnlessEqual(len(lah), 1)
1723 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1725 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1726 self.failUnlessEqual(last["corrupt-shares"], [])
1728 rec = last["space-recovered"]
1729 self.failUnlessEqual(rec["examined-buckets"], 4)
1730 self.failUnlessEqual(rec["examined-shares"], 4)
1731 self.failUnlessEqual(rec["actual-buckets"], 0)
1732 self.failUnlessEqual(rec["original-buckets"], 0)
1733 self.failUnlessEqual(rec["configured-buckets"], 0)
1734 self.failUnlessEqual(rec["actual-shares"], 0)
1735 self.failUnlessEqual(rec["original-shares"], 0)
1736 self.failUnlessEqual(rec["configured-shares"], 0)
1737 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1738 self.failUnlessEqual(rec["original-diskbytes"], 0)
1739 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1740 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1741 self.failUnlessEqual(rec["original-sharebytes"], 0)
1742 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1744 def _get_sharefile(si):
1745 return list(ss._iter_share_files(si))[0]
1746 def count_leases(si):
1747 return len(list(_get_sharefile(si).get_leases()))
1748 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1749 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1750 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1751 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1752 d.addCallback(_after_first_cycle)
1753 d.addCallback(lambda ign: self.render1(webstatus))
1754 def _check_html(html):
1755 s = remove_tags(html)
1756 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1757 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1758 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1759 "(2 mutable / 2 immutable),", s)
1760 self.failUnlessIn("but expiration was not enabled", s)
1761 d.addCallback(_check_html)
1762 d.addCallback(lambda ign: self.render_json(webstatus))
1763 def _check_json(json):
1764 data = simplejson.loads(json)
1765 self.failUnlessIn("lease-checker", data)
1766 self.failUnlessIn("lease-checker-progress", data)
1767 d.addCallback(_check_json)
1770 def backdate_lease(self, sf, renew_secret, new_expire_time):
1771 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1772 # "renew" a lease with a new_expire_time that is older than what the
1773 # current lease has), so we have to reach inside it.
1774 for i,lease in enumerate(sf.get_leases()):
1775 if lease.renew_secret == renew_secret:
1776 lease.expiration_time = new_expire_time
1777 f = open(sf.home, 'rb+')
1778 sf._write_lease_record(f, i, lease)
1781 raise IndexError("unable to renew non-existent lease")
1783 def test_expire_age(self):
1784 basedir = "storage/LeaseCrawler/expire_age"
1785 fileutil.make_dirs(basedir)
1786 # setting expiration_time to 2000 means that any lease which is more
1787 # than 2000s old will be expired.
1788 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1789 expiration_enabled=True,
1790 expiration_mode="age",
1791 expiration_override_lease_duration=2000)
1792 # make it start sooner than usual.
1793 lc = ss.lease_checker
1795 lc.stop_after_first_bucket = True
1796 webstatus = StorageStatus(ss)
1798 # create a few shares, with some leases on them
1799 self.make_shares(ss)
1800 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1802 def count_shares(si):
1803 return len(list(ss._iter_share_files(si)))
1804 def _get_sharefile(si):
1805 return list(ss._iter_share_files(si))[0]
1806 def count_leases(si):
1807 return len(list(_get_sharefile(si).get_leases()))
1809 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1810 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1811 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1812 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1813 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1814 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1815 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1816 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1818 # artificially crank back the expiration time on the first lease of
1819 # each share, to make it look like it expired already (age=1000s).
1820 # Some shares have an extra lease which is set to expire at the
1821 # default time in 31 days from now (age=31days). We then run the
1822 # crawler, which will expire the first lease, making some shares get
1823 # deleted and others stay alive (with one remaining lease)
1826 sf0 = _get_sharefile(immutable_si_0)
1827 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1828 sf0_size = os.stat(sf0.home).st_size
1830 # immutable_si_1 gets an extra lease
1831 sf1 = _get_sharefile(immutable_si_1)
1832 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1834 sf2 = _get_sharefile(mutable_si_2)
1835 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1836 sf2_size = os.stat(sf2.home).st_size
1838 # mutable_si_3 gets an extra lease
1839 sf3 = _get_sharefile(mutable_si_3)
1840 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1842 ss.setServiceParent(self.s)
1844 d = fireEventually()
1845 # examine the state right after the first bucket has been processed
1846 def _after_first_bucket(ignored):
1847 p = lc.get_progress()
1848 if not p["cycle-in-progress"]:
1849 d2 = fireEventually()
1850 d2.addCallback(_after_first_bucket)
1852 d.addCallback(_after_first_bucket)
1853 d.addCallback(lambda ign: self.render1(webstatus))
1854 def _check_html_in_cycle(html):
1855 s = remove_tags(html)
1856 # the first bucket encountered gets deleted, and its prefix
1857 # happens to be about 1/5th of the way through the ring, so the
1858 # predictor thinks we'll have 5 shares and that we'll delete them
1859 # all. This part of the test depends upon the SIs landing right
1860 # where they do now.
1861 self.failUnlessIn("The remainder of this cycle is expected to "
1862 "recover: 4 shares, 4 buckets", s)
1863 self.failUnlessIn("The whole cycle is expected to examine "
1864 "5 shares in 5 buckets and to recover: "
1865 "5 shares, 5 buckets", s)
1866 d.addCallback(_check_html_in_cycle)
1868 # wait for the crawler to finish the first cycle. Two shares should
1871 return bool(lc.get_state()["last-cycle-finished"] is not None)
1872 d.addCallback(lambda ign: self.poll(_wait))
1874 def _after_first_cycle(ignored):
1875 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1876 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1877 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1878 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1879 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1880 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1883 last = s["history"][0]
1885 self.failUnlessEqual(last["expiration-enabled"], True)
1886 self.failUnlessEqual(last["configured-expiration-mode"],
1887 ("age", 2000, None, ("mutable", "immutable")))
1888 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1890 rec = last["space-recovered"]
1891 self.failUnlessEqual(rec["examined-buckets"], 4)
1892 self.failUnlessEqual(rec["examined-shares"], 4)
1893 self.failUnlessEqual(rec["actual-buckets"], 2)
1894 self.failUnlessEqual(rec["original-buckets"], 2)
1895 self.failUnlessEqual(rec["configured-buckets"], 2)
1896 self.failUnlessEqual(rec["actual-shares"], 2)
1897 self.failUnlessEqual(rec["original-shares"], 2)
1898 self.failUnlessEqual(rec["configured-shares"], 2)
1899 size = sf0_size + sf2_size
1900 self.failUnlessEqual(rec["actual-sharebytes"], size)
1901 self.failUnlessEqual(rec["original-sharebytes"], size)
1902 self.failUnlessEqual(rec["configured-sharebytes"], size)
1903 # different platforms have different notions of "blocks used by
1904 # this file", so merely assert that it's a number
1905 self.failUnless(rec["actual-diskbytes"] >= 0,
1906 rec["actual-diskbytes"])
1907 self.failUnless(rec["original-diskbytes"] >= 0,
1908 rec["original-diskbytes"])
1909 self.failUnless(rec["configured-diskbytes"] >= 0,
1910 rec["configured-diskbytes"])
1911 d.addCallback(_after_first_cycle)
1912 d.addCallback(lambda ign: self.render1(webstatus))
1913 def _check_html(html):
1914 s = remove_tags(html)
1915 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1916 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1917 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1918 d.addCallback(_check_html)
1921 def test_expire_cutoff_date(self):
1922 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1923 fileutil.make_dirs(basedir)
1924 # setting cutoff-date to 2000 seconds ago means that any lease which
1925 # is more than 2000s old will be expired.
1927 then = int(now - 2000)
1928 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1929 expiration_enabled=True,
1930 expiration_mode="cutoff-date",
1931 expiration_cutoff_date=then)
1932 # make it start sooner than usual.
1933 lc = ss.lease_checker
1935 lc.stop_after_first_bucket = True
1936 webstatus = StorageStatus(ss)
1938 # create a few shares, with some leases on them
1939 self.make_shares(ss)
1940 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1942 def count_shares(si):
1943 return len(list(ss._iter_share_files(si)))
1944 def _get_sharefile(si):
1945 return list(ss._iter_share_files(si))[0]
1946 def count_leases(si):
1947 return len(list(_get_sharefile(si).get_leases()))
1949 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1950 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1951 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1952 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1953 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1954 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1955 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1956 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1958 # artificially crank back the expiration time on the first lease of
1959 # each share, to make it look like was renewed 3000s ago. To achieve
1960 # this, we need to set the expiration time to now-3000+31days. This
1961 # will change when the lease format is improved to contain both
1962 # create/renew time and duration.
1963 new_expiration_time = now - 3000 + 31*24*60*60
1965 # Some shares have an extra lease which is set to expire at the
1966 # default time in 31 days from now (age=31days). We then run the
1967 # crawler, which will expire the first lease, making some shares get
1968 # deleted and others stay alive (with one remaining lease)
1970 sf0 = _get_sharefile(immutable_si_0)
1971 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1972 sf0_size = os.stat(sf0.home).st_size
1974 # immutable_si_1 gets an extra lease
1975 sf1 = _get_sharefile(immutable_si_1)
1976 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1978 sf2 = _get_sharefile(mutable_si_2)
1979 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1980 sf2_size = os.stat(sf2.home).st_size
1982 # mutable_si_3 gets an extra lease
1983 sf3 = _get_sharefile(mutable_si_3)
1984 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1986 ss.setServiceParent(self.s)
1988 d = fireEventually()
1989 # examine the state right after the first bucket has been processed
1990 def _after_first_bucket(ignored):
1991 p = lc.get_progress()
1992 if not p["cycle-in-progress"]:
1993 d2 = fireEventually()
1994 d2.addCallback(_after_first_bucket)
1996 d.addCallback(_after_first_bucket)
1997 d.addCallback(lambda ign: self.render1(webstatus))
1998 def _check_html_in_cycle(html):
1999 s = remove_tags(html)
2000 # the first bucket encountered gets deleted, and its prefix
2001 # happens to be about 1/5th of the way through the ring, so the
2002 # predictor thinks we'll have 5 shares and that we'll delete them
2003 # all. This part of the test depends upon the SIs landing right
2004 # where they do now.
2005 self.failUnlessIn("The remainder of this cycle is expected to "
2006 "recover: 4 shares, 4 buckets", s)
2007 self.failUnlessIn("The whole cycle is expected to examine "
2008 "5 shares in 5 buckets and to recover: "
2009 "5 shares, 5 buckets", s)
2010 d.addCallback(_check_html_in_cycle)
2012 # wait for the crawler to finish the first cycle. Two shares should
2015 return bool(lc.get_state()["last-cycle-finished"] is not None)
2016 d.addCallback(lambda ign: self.poll(_wait))
2018 def _after_first_cycle(ignored):
2019 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2020 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2021 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2022 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2023 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2024 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2027 last = s["history"][0]
2029 self.failUnlessEqual(last["expiration-enabled"], True)
2030 self.failUnlessEqual(last["configured-expiration-mode"],
2031 ("cutoff-date", None, then,
2032 ("mutable", "immutable")))
2033 self.failUnlessEqual(last["leases-per-share-histogram"],
2036 rec = last["space-recovered"]
2037 self.failUnlessEqual(rec["examined-buckets"], 4)
2038 self.failUnlessEqual(rec["examined-shares"], 4)
2039 self.failUnlessEqual(rec["actual-buckets"], 2)
2040 self.failUnlessEqual(rec["original-buckets"], 0)
2041 self.failUnlessEqual(rec["configured-buckets"], 2)
2042 self.failUnlessEqual(rec["actual-shares"], 2)
2043 self.failUnlessEqual(rec["original-shares"], 0)
2044 self.failUnlessEqual(rec["configured-shares"], 2)
2045 size = sf0_size + sf2_size
2046 self.failUnlessEqual(rec["actual-sharebytes"], size)
2047 self.failUnlessEqual(rec["original-sharebytes"], 0)
2048 self.failUnlessEqual(rec["configured-sharebytes"], size)
2049 # different platforms have different notions of "blocks used by
2050 # this file", so merely assert that it's a number
2051 self.failUnless(rec["actual-diskbytes"] >= 0,
2052 rec["actual-diskbytes"])
2053 self.failUnless(rec["original-diskbytes"] >= 0,
2054 rec["original-diskbytes"])
2055 self.failUnless(rec["configured-diskbytes"] >= 0,
2056 rec["configured-diskbytes"])
2057 d.addCallback(_after_first_cycle)
2058 d.addCallback(lambda ign: self.render1(webstatus))
2059 def _check_html(html):
2060 s = remove_tags(html)
2061 self.failUnlessIn("Expiration Enabled:"
2062 " expired leases will be removed", s)
2063 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2064 substr = "Leases created or last renewed before %s will be considered expired." % date
2065 self.failUnlessIn(substr, s)
2066 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2067 d.addCallback(_check_html)
2070 def test_only_immutable(self):
2071 basedir = "storage/LeaseCrawler/only_immutable"
2072 fileutil.make_dirs(basedir)
2074 then = int(now - 2000)
2075 ss = StorageServer(basedir, "\x00" * 20,
2076 expiration_enabled=True,
2077 expiration_mode="cutoff-date",
2078 expiration_cutoff_date=then,
2079 expiration_sharetypes=("immutable",))
2080 lc = ss.lease_checker
2082 webstatus = StorageStatus(ss)
2084 self.make_shares(ss)
2085 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2086 # set all leases to be expirable
2087 new_expiration_time = now - 3000 + 31*24*60*60
2089 def count_shares(si):
2090 return len(list(ss._iter_share_files(si)))
2091 def _get_sharefile(si):
2092 return list(ss._iter_share_files(si))[0]
2093 def count_leases(si):
2094 return len(list(_get_sharefile(si).get_leases()))
2096 sf0 = _get_sharefile(immutable_si_0)
2097 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2098 sf1 = _get_sharefile(immutable_si_1)
2099 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2100 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2101 sf2 = _get_sharefile(mutable_si_2)
2102 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2103 sf3 = _get_sharefile(mutable_si_3)
2104 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2105 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2107 ss.setServiceParent(self.s)
2109 return bool(lc.get_state()["last-cycle-finished"] is not None)
2110 d = self.poll(_wait)
2112 def _after_first_cycle(ignored):
2113 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2114 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2115 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2116 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2117 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2118 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2119 d.addCallback(_after_first_cycle)
2120 d.addCallback(lambda ign: self.render1(webstatus))
2121 def _check_html(html):
2122 s = remove_tags(html)
2123 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2124 d.addCallback(_check_html)
2127 def test_only_mutable(self):
2128 basedir = "storage/LeaseCrawler/only_mutable"
2129 fileutil.make_dirs(basedir)
2131 then = int(now - 2000)
2132 ss = StorageServer(basedir, "\x00" * 20,
2133 expiration_enabled=True,
2134 expiration_mode="cutoff-date",
2135 expiration_cutoff_date=then,
2136 expiration_sharetypes=("mutable",))
2137 lc = ss.lease_checker
2139 webstatus = StorageStatus(ss)
2141 self.make_shares(ss)
2142 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2143 # set all leases to be expirable
2144 new_expiration_time = now - 3000 + 31*24*60*60
2146 def count_shares(si):
2147 return len(list(ss._iter_share_files(si)))
2148 def _get_sharefile(si):
2149 return list(ss._iter_share_files(si))[0]
2150 def count_leases(si):
2151 return len(list(_get_sharefile(si).get_leases()))
2153 sf0 = _get_sharefile(immutable_si_0)
2154 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2155 sf1 = _get_sharefile(immutable_si_1)
2156 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2157 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2158 sf2 = _get_sharefile(mutable_si_2)
2159 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2160 sf3 = _get_sharefile(mutable_si_3)
2161 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2162 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2164 ss.setServiceParent(self.s)
2166 return bool(lc.get_state()["last-cycle-finished"] is not None)
2167 d = self.poll(_wait)
2169 def _after_first_cycle(ignored):
2170 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2171 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2172 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2173 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2174 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2175 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2176 d.addCallback(_after_first_cycle)
2177 d.addCallback(lambda ign: self.render1(webstatus))
2178 def _check_html(html):
2179 s = remove_tags(html)
2180 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2181 d.addCallback(_check_html)
2184 def test_bad_mode(self):
2185 basedir = "storage/LeaseCrawler/bad_mode"
2186 fileutil.make_dirs(basedir)
2187 e = self.failUnlessRaises(ValueError,
2188 StorageServer, basedir, "\x00" * 20,
2189 expiration_mode="bogus")
2190 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2192 def test_parse_duration(self):
2196 p = time_format.parse_duration
2197 self.failUnlessEqual(p("7days"), 7*DAY)
2198 self.failUnlessEqual(p("31day"), 31*DAY)
2199 self.failUnlessEqual(p("60 days"), 60*DAY)
2200 self.failUnlessEqual(p("2mo"), 2*MONTH)
2201 self.failUnlessEqual(p("3 month"), 3*MONTH)
2202 self.failUnlessEqual(p("2years"), 2*YEAR)
2203 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2204 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2206 def test_parse_date(self):
2207 p = time_format.parse_date
2208 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2209 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2211 def test_limited_history(self):
2212 basedir = "storage/LeaseCrawler/limited_history"
2213 fileutil.make_dirs(basedir)
2214 ss = StorageServer(basedir, "\x00" * 20)
2215 # make it start sooner than usual.
2216 lc = ss.lease_checker
2220 # create a few shares, with some leases on them
2221 self.make_shares(ss)
2223 ss.setServiceParent(self.s)
2225 def _wait_until_15_cycles_done():
2226 last = lc.state["last-cycle-finished"]
2227 if last is not None and last >= 15:
2232 d = self.poll(_wait_until_15_cycles_done)
2234 def _check(ignored):
2237 self.failUnlessEqual(len(h), 10)
2238 self.failUnlessEqual(max(h.keys()), 15)
2239 self.failUnlessEqual(min(h.keys()), 6)
2240 d.addCallback(_check)
2243 def test_unpredictable_future(self):
2244 basedir = "storage/LeaseCrawler/unpredictable_future"
2245 fileutil.make_dirs(basedir)
2246 ss = StorageServer(basedir, "\x00" * 20)
2247 # make it start sooner than usual.
2248 lc = ss.lease_checker
2250 lc.cpu_slice = -1.0 # stop quickly
2252 self.make_shares(ss)
2254 ss.setServiceParent(self.s)
2256 d = fireEventually()
2257 def _check(ignored):
2258 # this should fire after the first bucket is complete, but before
2259 # the first prefix is complete, so the progress-measurer won't
2260 # think we've gotten far enough to raise our percent-complete
2261 # above 0%, triggering the cannot-predict-the-future code in
2262 # expirer.py . This will have to change if/when the
2263 # progress-measurer gets smart enough to count buckets (we'll
2264 # have to interrupt it even earlier, before it's finished the
2267 if "cycle-to-date" not in s:
2268 d2 = fireEventually()
2269 d2.addCallback(_check)
2271 self.failUnlessIn("cycle-to-date", s)
2272 self.failUnlessIn("estimated-remaining-cycle", s)
2273 self.failUnlessIn("estimated-current-cycle", s)
2275 left = s["estimated-remaining-cycle"]["space-recovered"]
2276 self.failUnlessEqual(left["actual-buckets"], None)
2277 self.failUnlessEqual(left["original-buckets"], None)
2278 self.failUnlessEqual(left["configured-buckets"], None)
2279 self.failUnlessEqual(left["actual-shares"], None)
2280 self.failUnlessEqual(left["original-shares"], None)
2281 self.failUnlessEqual(left["configured-shares"], None)
2282 self.failUnlessEqual(left["actual-diskbytes"], None)
2283 self.failUnlessEqual(left["original-diskbytes"], None)
2284 self.failUnlessEqual(left["configured-diskbytes"], None)
2285 self.failUnlessEqual(left["actual-sharebytes"], None)
2286 self.failUnlessEqual(left["original-sharebytes"], None)
2287 self.failUnlessEqual(left["configured-sharebytes"], None)
2289 full = s["estimated-remaining-cycle"]["space-recovered"]
2290 self.failUnlessEqual(full["actual-buckets"], None)
2291 self.failUnlessEqual(full["original-buckets"], None)
2292 self.failUnlessEqual(full["configured-buckets"], None)
2293 self.failUnlessEqual(full["actual-shares"], None)
2294 self.failUnlessEqual(full["original-shares"], None)
2295 self.failUnlessEqual(full["configured-shares"], None)
2296 self.failUnlessEqual(full["actual-diskbytes"], None)
2297 self.failUnlessEqual(full["original-diskbytes"], None)
2298 self.failUnlessEqual(full["configured-diskbytes"], None)
2299 self.failUnlessEqual(full["actual-sharebytes"], None)
2300 self.failUnlessEqual(full["original-sharebytes"], None)
2301 self.failUnlessEqual(full["configured-sharebytes"], None)
2303 d.addCallback(_check)
2306 def test_no_st_blocks(self):
2307 basedir = "storage/LeaseCrawler/no_st_blocks"
2308 fileutil.make_dirs(basedir)
2309 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2310 expiration_mode="age",
2311 expiration_override_lease_duration=-1000)
2312 # a negative expiration_time= means the "configured-"
2313 # space-recovered counts will be non-zero, since all shares will have
2316 # make it start sooner than usual.
2317 lc = ss.lease_checker
2320 self.make_shares(ss)
2321 ss.setServiceParent(self.s)
2323 return bool(lc.get_state()["last-cycle-finished"] is not None)
2324 d = self.poll(_wait)
2326 def _check(ignored):
2328 last = s["history"][0]
2329 rec = last["space-recovered"]
2330 self.failUnlessEqual(rec["configured-buckets"], 4)
2331 self.failUnlessEqual(rec["configured-shares"], 4)
2332 self.failUnless(rec["configured-sharebytes"] > 0,
2333 rec["configured-sharebytes"])
2334 # without the .st_blocks field in os.stat() results, we should be
2335 # reporting diskbytes==sharebytes
2336 self.failUnlessEqual(rec["configured-sharebytes"],
2337 rec["configured-diskbytes"])
2338 d.addCallback(_check)
2341 def test_share_corruption(self):
2342 self._poll_should_ignore_these_errors = [
2343 UnknownMutableContainerVersionError,
2344 UnknownImmutableContainerVersionError,
2346 basedir = "storage/LeaseCrawler/share_corruption"
2347 fileutil.make_dirs(basedir)
2348 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2349 w = StorageStatus(ss)
2350 # make it start sooner than usual.
2351 lc = ss.lease_checker
2352 lc.stop_after_first_bucket = True
2356 # create a few shares, with some leases on them
2357 self.make_shares(ss)
2359 # now corrupt one, and make sure the lease-checker keeps going
2360 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2361 first = min(self.sis)
2362 first_b32 = base32.b2a(first)
2363 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2366 f.write("BAD MAGIC")
2368 # if get_share_file() doesn't see the correct mutable magic, it
2369 # assumes the file is an immutable share, and then
2370 # immutable.ShareFile sees a bad version. So regardless of which kind
2371 # of share we corrupted, this will trigger an
2372 # UnknownImmutableContainerVersionError.
2374 # also create an empty bucket
2375 empty_si = base32.b2a("\x04"*16)
2376 empty_bucket_dir = os.path.join(ss.sharedir,
2377 storage_index_to_dir(empty_si))
2378 fileutil.make_dirs(empty_bucket_dir)
2380 ss.setServiceParent(self.s)
2382 d = fireEventually()
2384 # now examine the state right after the first bucket has been
2386 def _after_first_bucket(ignored):
2388 if "cycle-to-date" not in s:
2389 d2 = fireEventually()
2390 d2.addCallback(_after_first_bucket)
2392 so_far = s["cycle-to-date"]
2393 rec = so_far["space-recovered"]
2394 self.failUnlessEqual(rec["examined-buckets"], 1)
2395 self.failUnlessEqual(rec["examined-shares"], 0)
2396 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2397 d.addCallback(_after_first_bucket)
2399 d.addCallback(lambda ign: self.render_json(w))
2400 def _check_json(json):
2401 data = simplejson.loads(json)
2402 # grr. json turns all dict keys into strings.
2403 so_far = data["lease-checker"]["cycle-to-date"]
2404 corrupt_shares = so_far["corrupt-shares"]
2405 # it also turns all tuples into lists
2406 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2407 d.addCallback(_check_json)
2408 d.addCallback(lambda ign: self.render1(w))
2409 def _check_html(html):
2410 s = remove_tags(html)
2411 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2412 d.addCallback(_check_html)
2415 return bool(lc.get_state()["last-cycle-finished"] is not None)
2416 d.addCallback(lambda ign: self.poll(_wait))
2418 def _after_first_cycle(ignored):
2420 last = s["history"][0]
2421 rec = last["space-recovered"]
2422 self.failUnlessEqual(rec["examined-buckets"], 5)
2423 self.failUnlessEqual(rec["examined-shares"], 3)
2424 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2425 d.addCallback(_after_first_cycle)
2426 d.addCallback(lambda ign: self.render_json(w))
2427 def _check_json_history(json):
2428 data = simplejson.loads(json)
2429 last = data["lease-checker"]["history"]["0"]
2430 corrupt_shares = last["corrupt-shares"]
2431 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2432 d.addCallback(_check_json_history)
2433 d.addCallback(lambda ign: self.render1(w))
2434 def _check_html_history(html):
2435 s = remove_tags(html)
2436 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2437 d.addCallback(_check_html_history)
2440 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2441 UnknownImmutableContainerVersionError)
2446 def render_json(self, page):
2447 d = self.render1(page, args={"t": ["json"]})
2450 class NoDiskStatsServer(StorageServer):
2451 def get_disk_stats(self):
2452 raise AttributeError
2454 class BadDiskStatsServer(StorageServer):
2455 def get_disk_stats(self):
2458 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2461 self.s = service.MultiService()
2462 self.s.startService()
2464 return self.s.stopService()
2466 def test_no_server(self):
2467 w = StorageStatus(None)
2468 html = w.renderSynchronously()
2469 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2471 def test_status(self):
2472 basedir = "storage/WebStatus/status"
2473 fileutil.make_dirs(basedir)
2474 ss = StorageServer(basedir, "\x00" * 20)
2475 ss.setServiceParent(self.s)
2476 w = StorageStatus(ss)
2478 def _check_html(html):
2479 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2480 s = remove_tags(html)
2481 self.failUnlessIn("Accepting new shares: Yes", s)
2482 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2483 d.addCallback(_check_html)
2484 d.addCallback(lambda ign: self.render_json(w))
2485 def _check_json(json):
2486 data = simplejson.loads(json)
2488 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2489 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2490 self.failUnlessIn("bucket-counter", data)
2491 self.failUnlessIn("lease-checker", data)
2492 d.addCallback(_check_json)
2495 def render_json(self, page):
2496 d = self.render1(page, args={"t": ["json"]})
2499 def test_status_no_disk_stats(self):
2500 # Some platforms may have no disk stats API. Make sure the code can handle that
2501 # (test runs on all platforms).
2502 basedir = "storage/WebStatus/status_no_disk_stats"
2503 fileutil.make_dirs(basedir)
2504 ss = NoDiskStatsServer(basedir, "\x00" * 20)
2505 ss.setServiceParent(self.s)
2506 w = StorageStatus(ss)
2507 html = w.renderSynchronously()
2508 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2509 s = remove_tags(html)
2510 self.failUnlessIn("Accepting new shares: Yes", s)
2511 self.failUnlessIn("Total disk space: ?", s)
2512 self.failUnlessIn("Space Available to Tahoe: ?", s)
2513 self.failUnless(ss.get_available_space() is None)
2515 def test_status_bad_disk_stats(self):
2516 # If the API to get disk stats exists but a call to it fails, then the status should
2517 # show that no shares will be accepted, and get_available_space() should be 0.
2518 basedir = "storage/WebStatus/status_bad_disk_stats"
2519 fileutil.make_dirs(basedir)
2520 ss = BadDiskStatsServer(basedir, "\x00" * 20)
2521 ss.setServiceParent(self.s)
2522 w = StorageStatus(ss)
2523 html = w.renderSynchronously()
2524 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2525 s = remove_tags(html)
2526 self.failUnlessIn("Accepting new shares: No", s)
2527 self.failUnlessIn("Total disk space: ?", s)
2528 self.failUnlessIn("Space Available to Tahoe: ?", s)
2529 self.failUnlessEqual(ss.get_available_space(), 0)
2531 def test_readonly(self):
2532 basedir = "storage/WebStatus/readonly"
2533 fileutil.make_dirs(basedir)
2534 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2535 ss.setServiceParent(self.s)
2536 w = StorageStatus(ss)
2537 html = w.renderSynchronously()
2538 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2539 s = remove_tags(html)
2540 self.failUnlessIn("Accepting new shares: No", s)
2542 def test_reserved(self):
2543 basedir = "storage/WebStatus/reserved"
2544 fileutil.make_dirs(basedir)
2545 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2546 ss.setServiceParent(self.s)
2547 w = StorageStatus(ss)
2548 html = w.renderSynchronously()
2549 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2550 s = remove_tags(html)
2551 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2553 def test_huge_reserved(self):
2554 basedir = "storage/WebStatus/reserved"
2555 fileutil.make_dirs(basedir)
2556 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2557 ss.setServiceParent(self.s)
2558 w = StorageStatus(ss)
2559 html = w.renderSynchronously()
2560 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2561 s = remove_tags(html)
2562 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2564 def test_util(self):
2565 w = StorageStatus(None)
2566 self.failUnlessEqual(w.render_space(None, None), "?")
2567 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2568 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2569 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2570 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2571 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)