2 import time, os.path, stat, re, simplejson, struct
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
30 def __init__(self, ignore_disconnectors=False):
31 self.ignore = ignore_disconnectors
32 self.disconnectors = {}
33 def notifyOnDisconnect(self, f, *args, **kwargs):
37 self.disconnectors[m] = (f, args, kwargs)
39 def dontNotifyOnDisconnect(self, marker):
42 del self.disconnectors[marker]
44 class FakeStatsProvider:
45 def count(self, name, delta=1):
47 def register_producer(self, producer):
50 class Bucket(unittest.TestCase):
51 def make_workdir(self, name):
52 basedir = os.path.join("storage", "Bucket", name)
53 incoming = os.path.join(basedir, "tmp", "bucket")
54 final = os.path.join(basedir, "bucket")
55 fileutil.make_dirs(basedir)
56 fileutil.make_dirs(os.path.join(basedir, "tmp"))
57 return incoming, final
59 def bucket_writer_closed(self, bw, consumed):
61 def add_latency(self, category, latency):
63 def count(self, name, delta=1):
68 renew_secret = os.urandom(32)
69 cancel_secret = os.urandom(32)
70 expiration_time = time.time() + 5000
71 return LeaseInfo(owner_num, renew_secret, cancel_secret,
72 expiration_time, "\x00" * 20)
74 def test_create(self):
75 incoming, final = self.make_workdir("test_create")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*25)
81 bw.remote_write(75, "d"*7)
84 def test_readwrite(self):
85 incoming, final = self.make_workdir("test_readwrite")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*7) # last block may be short
94 br = BucketReader(self, bw.finalhome)
95 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101 def callRemote(self, methname, *args, **kwargs):
103 meth = getattr(self.target, "remote_" + methname)
104 return meth(*args, **kwargs)
105 return defer.maybeDeferred(_call)
107 class BucketProxy(unittest.TestCase):
108 def make_bucket(self, name, size):
109 basedir = os.path.join("storage", "BucketProxy", name)
110 incoming = os.path.join(basedir, "tmp", "bucket")
111 final = os.path.join(basedir, "bucket")
112 fileutil.make_dirs(basedir)
113 fileutil.make_dirs(os.path.join(basedir, "tmp"))
114 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
120 def make_lease(self):
122 renew_secret = os.urandom(32)
123 cancel_secret = os.urandom(32)
124 expiration_time = time.time() + 5000
125 return LeaseInfo(owner_num, renew_secret, cancel_secret,
126 expiration_time, "\x00" * 20)
128 def bucket_writer_closed(self, bw, consumed):
130 def add_latency(self, category, latency):
132 def count(self, name, delta=1):
135 def test_create(self):
136 bw, rb, sharefname = self.make_bucket("test_create", 500)
137 bp = WriteBucketProxy(rb,
142 uri_extension_size_max=500, nodeid=None)
143 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
145 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146 # Let's pretend each share has 100 bytes of data, and that there are
147 # 4 segments (25 bytes each), and 8 shares total. So the two
148 # per-segment merkle trees (crypttext_hash_tree,
149 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152 # long. That should make the whole share:
154 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165 uri_extension = "s" + "E"*498 + "e"
167 bw, rb, sharefname = self.make_bucket(name, sharesize)
173 uri_extension_size_max=len(uri_extension),
177 d.addCallback(lambda res: bp.put_block(0, "a"*25))
178 d.addCallback(lambda res: bp.put_block(1, "b"*25))
179 d.addCallback(lambda res: bp.put_block(2, "c"*25))
180 d.addCallback(lambda res: bp.put_block(3, "d"*20))
181 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185 d.addCallback(lambda res: bp.close())
187 # now read everything back
188 def _start_reading(res):
189 br = BucketReader(self, sharefname)
192 rbp = rbp_class(rb, peerid="abc", storage_index="")
193 self.failUnless("to peer" in repr(rbp))
194 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
196 d1 = rbp.get_block_data(0, 25, 25)
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206 d1.addCallback(lambda res:
207 self.failUnlessEqual(res, crypttext_hashes))
208 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210 d1.addCallback(lambda res: rbp.get_share_hashes())
211 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212 d1.addCallback(lambda res: rbp.get_uri_extension())
213 d1.addCallback(lambda res:
214 self.failUnlessEqual(res, uri_extension))
218 d.addCallback(_start_reading)
222 def test_readwrite_v1(self):
223 return self._do_test_readwrite("test_readwrite_v1",
224 0x24, WriteBucketProxy, ReadBucketProxy)
226 def test_readwrite_v2(self):
227 return self._do_test_readwrite("test_readwrite_v2",
228 0x44, WriteBucketProxy_v2, ReadBucketProxy)
230 class FakeDiskStorageServer(StorageServer):
231 def stat_disk(self, d):
232 return self.DISKAVAIL
234 class Server(unittest.TestCase):
237 self.sparent = LoggingServiceParent()
238 self.sparent.startService()
239 self._lease_secret = itertools.count()
241 return self.sparent.stopService()
243 def workdir(self, name):
244 basedir = os.path.join("storage", "Server", name)
247 def create(self, name, reserved_space=0, klass=StorageServer):
248 workdir = self.workdir(name)
249 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
250 stats_provider=FakeStatsProvider())
251 ss.setServiceParent(self.sparent)
254 def test_create(self):
255 ss = self.create("test_create")
257 def allocate(self, ss, storage_index, sharenums, size, canary=None):
258 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
261 canary = FakeCanary()
262 return ss.remote_allocate_buckets(storage_index,
263 renew_secret, cancel_secret,
264 sharenums, size, canary)
266 def test_large_share(self):
267 ss = self.create("test_large_share")
269 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
270 self.failUnlessEqual(already, set())
271 self.failUnlessEqual(set(writers.keys()), set([0]))
273 shnum, bucket = writers.items()[0]
274 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
275 bucket.remote_write(2**32, "ab")
276 bucket.remote_close()
278 readers = ss.remote_get_buckets("allocate")
279 reader = readers[shnum]
280 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
281 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
283 def test_dont_overfill_dirs(self):
285 This test asserts that if you add a second share whose storage index
286 share lots of leading bits with an extant share (but isn't the exact
287 same storage index), this won't add an entry to the share directory.
289 ss = self.create("test_dont_overfill_dirs")
290 already, writers = self.allocate(ss, "storageindex", [0], 10)
291 for i, wb in writers.items():
292 wb.remote_write(0, "%10d" % i)
294 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296 children_of_storedir = set(os.listdir(storedir))
298 # Now store another one under another storageindex that has leading
299 # chars the same as the first storageindex.
300 already, writers = self.allocate(ss, "storageindey", [0], 10)
301 for i, wb in writers.items():
302 wb.remote_write(0, "%10d" % i)
304 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
306 new_children_of_storedir = set(os.listdir(storedir))
307 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
309 def test_remove_incoming(self):
310 ss = self.create("test_remove_incoming")
311 already, writers = self.allocate(ss, "vid", range(3), 10)
312 for i,wb in writers.items():
313 wb.remote_write(0, "%10d" % i)
315 incoming_share_dir = wb.incominghome
316 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
317 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
318 incoming_dir = os.path.dirname(incoming_prefix_dir)
319 self.failIf(os.path.exists(incoming_bucket_dir))
320 self.failIf(os.path.exists(incoming_prefix_dir))
321 self.failUnless(os.path.exists(incoming_dir))
323 def test_allocate(self):
324 ss = self.create("test_allocate")
326 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
328 canary = FakeCanary()
329 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330 self.failUnlessEqual(already, set())
331 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
333 # while the buckets are open, they should not count as readable
334 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
337 for i,wb in writers.items():
338 wb.remote_write(0, "%25d" % i)
340 # aborting a bucket that was already closed is a no-op
343 # now they should be readable
344 b = ss.remote_get_buckets("allocate")
345 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
348 self.failUnless("BucketReader" in b_str, b_str)
349 self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
351 # now if we ask about writing again, the server should offer those
352 # three buckets as already present. It should offer them even if we
353 # don't ask about those specific ones.
354 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355 self.failUnlessEqual(already, set([0,1,2]))
356 self.failUnlessEqual(set(writers.keys()), set([3,4]))
358 # while those two buckets are open for writing, the server should
359 # refuse to offer them to uploaders
361 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362 self.failUnlessEqual(already2, set([0,1,2]))
363 self.failUnlessEqual(set(writers2.keys()), set([5]))
365 # aborting the writes should remove the tempfiles
366 for i,wb in writers2.items():
368 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369 self.failUnlessEqual(already2, set([0,1,2]))
370 self.failUnlessEqual(set(writers2.keys()), set([5]))
372 for i,wb in writers2.items():
374 for i,wb in writers.items():
377 def test_bad_container_version(self):
378 ss = self.create("test_bad_container_version")
379 a,w = self.allocate(ss, "si1", [0], 10)
380 w[0].remote_write(0, "\xff"*10)
383 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
386 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
389 b = ss.remote_get_buckets("allocate")
391 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392 ss.remote_get_buckets, "si1")
393 self.failUnless(" had version 0 but we wanted 1" in str(e), e)
395 def test_disconnect(self):
396 # simulate a disconnection
397 ss = self.create("test_disconnect")
398 canary = FakeCanary()
399 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400 self.failUnlessEqual(already, set())
401 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402 for (f,args,kwargs) in canary.disconnectors.values():
407 # that ought to delete the incoming shares
408 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409 self.failUnlessEqual(already, set())
410 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
412 def test_reserved_space(self):
413 ss = self.create("test_reserved_space", reserved_space=10000,
414 klass=FakeDiskStorageServer)
415 # the FakeDiskStorageServer doesn't do real statvfs() calls
417 # 15k available, 10k reserved, leaves 5k for shares
419 # a newly created and filled share incurs this much overhead, beyond
420 # the size we request.
422 LEASE_SIZE = 4+32+32+4
423 canary = FakeCanary(True)
424 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425 self.failUnlessEqual(len(writers), 3)
426 # now the StorageServer should have 3000 bytes provisionally
427 # allocated, allowing only 2000 more to be claimed
428 self.failUnlessEqual(len(ss._active_writers), 3)
430 # allocating 1001-byte shares only leaves room for one
431 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432 self.failUnlessEqual(len(writers2), 1)
433 self.failUnlessEqual(len(ss._active_writers), 4)
435 # we abandon the first set, so their provisional allocation should be
439 self.failUnlessEqual(len(ss._active_writers), 1)
440 # now we have a provisional allocation of 1001 bytes
442 # and we close the second set, so their provisional allocation should
443 # become real, long-term allocation, and grows to include the
445 for bw in writers2.values():
446 bw.remote_write(0, "a"*25)
451 self.failUnlessEqual(len(ss._active_writers), 0)
453 allocated = 1001 + OVERHEAD + LEASE_SIZE
455 # we have to manually increase DISKAVAIL, since we're not doing real
457 ss.DISKAVAIL -= allocated
459 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462 self.failUnlessEqual(len(writers3), 39)
463 self.failUnlessEqual(len(ss._active_writers), 39)
467 self.failUnlessEqual(len(ss._active_writers), 0)
468 ss.disownServiceParent()
472 basedir = self.workdir("test_seek_behavior")
473 fileutil.make_dirs(basedir)
474 filename = os.path.join(basedir, "testfile")
475 f = open(filename, "wb")
478 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
479 # files. mode="a" preserves previous contents but does not allow
480 # seeking-to-create-holes. mode="r+" allows both.
481 f = open(filename, "rb+")
485 filelen = os.stat(filename)[stat.ST_SIZE]
486 self.failUnlessEqual(filelen, 100+3)
487 f2 = open(filename, "rb")
488 self.failUnlessEqual(f2.read(5), "start")
491 def test_leases(self):
492 ss = self.create("test_leases")
493 canary = FakeCanary()
497 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
498 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
499 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
500 sharenums, size, canary)
501 self.failUnlessEqual(len(already), 0)
502 self.failUnlessEqual(len(writers), 5)
503 for wb in writers.values():
506 leases = list(ss.get_leases("si0"))
507 self.failUnlessEqual(len(leases), 1)
508 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
510 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
511 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
512 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
513 sharenums, size, canary)
514 for wb in writers.values():
517 # take out a second lease on si1
518 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
521 sharenums, size, canary)
522 self.failUnlessEqual(len(already), 5)
523 self.failUnlessEqual(len(writers), 0)
525 leases = list(ss.get_leases("si1"))
526 self.failUnlessEqual(len(leases), 2)
527 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
529 # and a third lease, using add-lease
530 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
531 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
532 ss.remote_add_lease("si1", rs2a, cs2a)
533 leases = list(ss.get_leases("si1"))
534 self.failUnlessEqual(len(leases), 3)
535 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
537 # add-lease on a missing storage index is silently ignored
538 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
540 # check that si0 is readable
541 readers = ss.remote_get_buckets("si0")
542 self.failUnlessEqual(len(readers), 5)
544 # renew the first lease. Only the proper renew_secret should work
545 ss.remote_renew_lease("si0", rs0)
546 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
547 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
549 # check that si0 is still readable
550 readers = ss.remote_get_buckets("si0")
551 self.failUnlessEqual(len(readers), 5)
554 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
555 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
556 ss.remote_cancel_lease("si0", cs0)
558 # si0 should now be gone
559 readers = ss.remote_get_buckets("si0")
560 self.failUnlessEqual(len(readers), 0)
561 # and the renew should no longer work
562 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
565 # cancel the first lease on si1, leaving the second and third in place
566 ss.remote_cancel_lease("si1", cs1)
567 readers = ss.remote_get_buckets("si1")
568 self.failUnlessEqual(len(readers), 5)
569 # the corresponding renew should no longer work
570 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
572 leases = list(ss.get_leases("si1"))
573 self.failUnlessEqual(len(leases), 2)
574 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
576 ss.remote_renew_lease("si1", rs2)
577 # cancelling the second and third should make it go away
578 ss.remote_cancel_lease("si1", cs2)
579 ss.remote_cancel_lease("si1", cs2a)
580 readers = ss.remote_get_buckets("si1")
581 self.failUnlessEqual(len(readers), 0)
582 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
583 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
584 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
586 leases = list(ss.get_leases("si1"))
587 self.failUnlessEqual(len(leases), 0)
590 # test overlapping uploads
591 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
592 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
593 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
594 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
595 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
596 sharenums, size, canary)
597 self.failUnlessEqual(len(already), 0)
598 self.failUnlessEqual(len(writers), 5)
599 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
600 sharenums, size, canary)
601 self.failUnlessEqual(len(already2), 0)
602 self.failUnlessEqual(len(writers2), 0)
603 for wb in writers.values():
606 leases = list(ss.get_leases("si3"))
607 self.failUnlessEqual(len(leases), 1)
609 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
610 sharenums, size, canary)
611 self.failUnlessEqual(len(already3), 5)
612 self.failUnlessEqual(len(writers3), 0)
614 leases = list(ss.get_leases("si3"))
615 self.failUnlessEqual(len(leases), 2)
617 def test_readonly(self):
618 workdir = self.workdir("test_readonly")
619 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
620 ss.setServiceParent(self.sparent)
622 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
623 self.failUnlessEqual(already, set())
624 self.failUnlessEqual(writers, {})
626 stats = ss.get_stats()
627 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
629 if "storage_server.disk_avail" in stats:
630 # windows does not have os.statvfs, so it doesn't give us disk
631 # stats. But if there are stats, readonly_storage means
633 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
635 def test_discard(self):
636 # discard is really only used for other tests, but we test it anyways
637 workdir = self.workdir("test_discard")
638 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
639 ss.setServiceParent(self.sparent)
641 canary = FakeCanary()
642 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
643 self.failUnlessEqual(already, set())
644 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
645 for i,wb in writers.items():
646 wb.remote_write(0, "%25d" % i)
648 # since we discard the data, the shares should be present but sparse.
649 # Since we write with some seeks, the data we read back will be all
651 b = ss.remote_get_buckets("vid")
652 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
653 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
655 def test_advise_corruption(self):
656 workdir = self.workdir("test_advise_corruption")
657 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
658 ss.setServiceParent(self.sparent)
660 si0_s = base32.b2a("si0")
661 ss.remote_advise_corrupt_share("immutable", "si0", 0,
662 "This share smells funny.\n")
663 reportdir = os.path.join(workdir, "corruption-advisories")
664 reports = os.listdir(reportdir)
665 self.failUnlessEqual(len(reports), 1)
666 report_si0 = reports[0]
667 self.failUnless(si0_s in report_si0, report_si0)
668 f = open(os.path.join(reportdir, report_si0), "r")
671 self.failUnless("type: immutable" in report)
672 self.failUnless(("storage_index: %s" % si0_s) in report)
673 self.failUnless("share_number: 0" in report)
674 self.failUnless("This share smells funny." in report)
676 # test the RIBucketWriter version too
677 si1_s = base32.b2a("si1")
678 already,writers = self.allocate(ss, "si1", [1], 75)
679 self.failUnlessEqual(already, set())
680 self.failUnlessEqual(set(writers.keys()), set([1]))
681 writers[1].remote_write(0, "data")
682 writers[1].remote_close()
684 b = ss.remote_get_buckets("si1")
685 self.failUnlessEqual(set(b.keys()), set([1]))
686 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
688 reports = os.listdir(reportdir)
689 self.failUnlessEqual(len(reports), 2)
690 report_si1 = [r for r in reports if si1_s in r][0]
691 f = open(os.path.join(reportdir, report_si1), "r")
694 self.failUnless("type: immutable" in report)
695 self.failUnless(("storage_index: %s" % si1_s) in report)
696 self.failUnless("share_number: 1" in report)
697 self.failUnless("This share tastes like dust." in report)
701 class MutableServer(unittest.TestCase):
704 self.sparent = LoggingServiceParent()
705 self._lease_secret = itertools.count()
707 return self.sparent.stopService()
709 def workdir(self, name):
710 basedir = os.path.join("storage", "MutableServer", name)
713 def create(self, name):
714 workdir = self.workdir(name)
715 ss = StorageServer(workdir, "\x00" * 20)
716 ss.setServiceParent(self.sparent)
719 def test_create(self):
720 ss = self.create("test_create")
722 def write_enabler(self, we_tag):
723 return hashutil.tagged_hash("we_blah", we_tag)
725 def renew_secret(self, tag):
726 return hashutil.tagged_hash("renew_blah", str(tag))
728 def cancel_secret(self, tag):
729 return hashutil.tagged_hash("cancel_blah", str(tag))
731 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
732 write_enabler = self.write_enabler(we_tag)
733 renew_secret = self.renew_secret(lease_tag)
734 cancel_secret = self.cancel_secret(lease_tag)
735 rstaraw = ss.remote_slot_testv_and_readv_and_writev
736 testandwritev = dict( [ (shnum, ([], [], None) )
737 for shnum in sharenums ] )
739 rc = rstaraw(storage_index,
740 (write_enabler, renew_secret, cancel_secret),
743 (did_write, readv_data) = rc
744 self.failUnless(did_write)
745 self.failUnless(isinstance(readv_data, dict))
746 self.failUnlessEqual(len(readv_data), 0)
748 def test_bad_magic(self):
749 ss = self.create("test_bad_magic")
750 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
751 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
756 read = ss.remote_slot_readv
757 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
758 read, "si1", [0], [(0,10)])
759 self.failUnless(" had magic " in str(e), e)
760 self.failUnless(" but we wanted " in str(e), e)
762 def test_container_size(self):
763 ss = self.create("test_container_size")
764 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
766 read = ss.remote_slot_readv
767 rstaraw = ss.remote_slot_testv_and_readv_and_writev
768 secrets = ( self.write_enabler("we1"),
769 self.renew_secret("we1"),
770 self.cancel_secret("we1") )
771 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
772 answer = rstaraw("si1", secrets,
773 {0: ([], [(0,data)], len(data)+12)},
775 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
777 # trying to make the container too large will raise an exception
778 TOOBIG = MutableShareFile.MAX_SIZE + 10
779 self.failUnlessRaises(DataTooLargeError,
780 rstaraw, "si1", secrets,
781 {0: ([], [(0,data)], TOOBIG)},
784 # it should be possible to make the container smaller, although at
785 # the moment this doesn't actually affect the share, unless the
786 # container size is dropped to zero, in which case the share is
788 answer = rstaraw("si1", secrets,
789 {0: ([], [(0,data)], len(data)+8)},
791 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
793 answer = rstaraw("si1", secrets,
794 {0: ([], [(0,data)], 0)},
796 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
798 read_answer = read("si1", [0], [(0,10)])
799 self.failUnlessEqual(read_answer, {})
801 def test_allocate(self):
802 ss = self.create("test_allocate")
803 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
806 read = ss.remote_slot_readv
807 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
809 self.failUnlessEqual(read("si1", [], [(0, 10)]),
810 {0: [""], 1: [""], 2: [""]})
811 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
815 secrets = ( self.write_enabler("we1"),
816 self.renew_secret("we1"),
817 self.cancel_secret("we1") )
818 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
819 write = ss.remote_slot_testv_and_readv_and_writev
820 answer = write("si1", secrets,
821 {0: ([], [(0,data)], None)},
823 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
825 self.failUnlessEqual(read("si1", [0], [(0,20)]),
826 {0: ["00000000001111111111"]})
827 self.failUnlessEqual(read("si1", [0], [(95,10)]),
829 #self.failUnlessEqual(s0.remote_get_length(), 100)
831 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
832 f = self.failUnlessRaises(BadWriteEnablerError,
833 write, "si1", bad_secrets,
835 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
837 # this testv should fail
838 answer = write("si1", secrets,
839 {0: ([(0, 12, "eq", "444444444444"),
840 (20, 5, "eq", "22222"),
847 self.failUnlessEqual(answer, (False,
848 {0: ["000000000011", "22222"],
852 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
855 answer = write("si1", secrets,
856 {0: ([(10, 5, "lt", "11111"),
863 self.failUnlessEqual(answer, (False,
868 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
871 def test_operators(self):
872 # test operators, the data we're comparing is '11111' in all cases.
873 # test both fail+pass, reset data after each one.
874 ss = self.create("test_operators")
876 secrets = ( self.write_enabler("we1"),
877 self.renew_secret("we1"),
878 self.cancel_secret("we1") )
879 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
880 write = ss.remote_slot_testv_and_readv_and_writev
881 read = ss.remote_slot_readv
884 write("si1", secrets,
885 {0: ([], [(0,data)], None)},
891 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
896 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
897 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
898 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
901 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
906 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
907 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
910 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
915 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
916 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
920 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
925 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
926 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
929 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
934 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
935 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
938 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
943 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
944 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
948 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
953 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
954 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
957 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
962 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
963 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
967 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
972 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
973 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
976 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
981 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
982 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
986 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
991 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
992 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
995 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1000 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1001 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1004 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1009 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1010 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1014 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1019 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1020 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1023 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1028 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1029 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1032 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1037 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1038 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1041 # finally, test some operators against empty shares
1042 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1047 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1048 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1051 def test_readv(self):
1052 ss = self.create("test_readv")
1053 secrets = ( self.write_enabler("we1"),
1054 self.renew_secret("we1"),
1055 self.cancel_secret("we1") )
1056 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1057 write = ss.remote_slot_testv_and_readv_and_writev
1058 read = ss.remote_slot_readv
1059 data = [("%d" % i) * 100 for i in range(3)]
1060 rc = write("si1", secrets,
1061 {0: ([], [(0,data[0])], None),
1062 1: ([], [(0,data[1])], None),
1063 2: ([], [(0,data[2])], None),
1065 self.failUnlessEqual(rc, (True, {}))
1067 answer = read("si1", [], [(0, 10)])
1068 self.failUnlessEqual(answer, {0: ["0"*10],
1072 def compare_leases_without_timestamps(self, leases_a, leases_b):
1073 self.failUnlessEqual(len(leases_a), len(leases_b))
1074 for i in range(len(leases_a)):
1077 self.failUnlessEqual(a.owner_num, b.owner_num)
1078 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1079 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1080 self.failUnlessEqual(a.nodeid, b.nodeid)
1082 def compare_leases(self, leases_a, leases_b):
1083 self.failUnlessEqual(len(leases_a), len(leases_b))
1084 for i in range(len(leases_a)):
1087 self.failUnlessEqual(a.owner_num, b.owner_num)
1088 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1089 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1090 self.failUnlessEqual(a.nodeid, b.nodeid)
1091 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1093 def test_leases(self):
1094 ss = self.create("test_leases")
1096 return ( self.write_enabler("we1"),
1097 self.renew_secret("we1-%d" % n),
1098 self.cancel_secret("we1-%d" % n) )
1099 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1100 write = ss.remote_slot_testv_and_readv_and_writev
1101 read = ss.remote_slot_readv
1102 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1103 self.failUnlessEqual(rc, (True, {}))
1105 # create a random non-numeric file in the bucket directory, to
1106 # exercise the code that's supposed to ignore those.
1107 bucket_dir = os.path.join(self.workdir("test_leases"),
1108 "shares", storage_index_to_dir("si1"))
1109 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1110 f.write("you ought to be ignoring me\n")
1113 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1114 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1116 # add-lease on a missing storage index is silently ignored
1117 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1119 # re-allocate the slots and use the same secrets, that should update
1121 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1122 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1125 ss.remote_renew_lease("si1", secrets(0)[1])
1126 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1128 # now allocate them with a bunch of different secrets, to trigger the
1129 # extended lease code. Use add_lease for one of them.
1130 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1131 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1132 secrets2 = secrets(2)
1133 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1134 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1135 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1136 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1137 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1139 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1141 # cancel one of them
1142 ss.remote_cancel_lease("si1", secrets(5)[2])
1143 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1145 all_leases = list(s0.get_leases())
1146 # and write enough data to expand the container, forcing the server
1147 # to move the leases
1148 write("si1", secrets(0),
1149 {0: ([], [(0,data)], 200), },
1152 # read back the leases, make sure they're still intact.
1153 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1155 ss.remote_renew_lease("si1", secrets(0)[1])
1156 ss.remote_renew_lease("si1", secrets(1)[1])
1157 ss.remote_renew_lease("si1", secrets(2)[1])
1158 ss.remote_renew_lease("si1", secrets(3)[1])
1159 ss.remote_renew_lease("si1", secrets(4)[1])
1160 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1161 # get a new copy of the leases, with the current timestamps. Reading
1162 # data and failing to renew/cancel leases should leave the timestamps
1164 all_leases = list(s0.get_leases())
1165 # renewing with a bogus token should prompt an error message
1167 # examine the exception thus raised, make sure the old nodeid is
1168 # present, to provide for share migration
1169 e = self.failUnlessRaises(IndexError,
1170 ss.remote_renew_lease, "si1",
1173 self.failUnless("Unable to renew non-existent lease" in e_s)
1174 self.failUnless("I have leases accepted by nodeids:" in e_s)
1175 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1177 # same for cancelling
1178 self.failUnlessRaises(IndexError,
1179 ss.remote_cancel_lease, "si1",
1181 self.compare_leases(all_leases, list(s0.get_leases()))
1183 # reading shares should not modify the timestamp
1184 read("si1", [], [(0,200)])
1185 self.compare_leases(all_leases, list(s0.get_leases()))
1187 write("si1", secrets(0),
1188 {0: ([], [(200, "make me bigger")], None)}, [])
1189 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1191 write("si1", secrets(0),
1192 {0: ([], [(500, "make me really bigger")], None)}, [])
1193 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1195 # now cancel them all
1196 ss.remote_cancel_lease("si1", secrets(0)[2])
1197 ss.remote_cancel_lease("si1", secrets(1)[2])
1198 ss.remote_cancel_lease("si1", secrets(2)[2])
1199 ss.remote_cancel_lease("si1", secrets(3)[2])
1201 # the slot should still be there
1202 remaining_shares = read("si1", [], [(0,10)])
1203 self.failUnlessEqual(len(remaining_shares), 1)
1204 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1206 # cancelling a non-existent lease should raise an IndexError
1207 self.failUnlessRaises(IndexError,
1208 ss.remote_cancel_lease, "si1", "nonsecret")
1210 # and the slot should still be there
1211 remaining_shares = read("si1", [], [(0,10)])
1212 self.failUnlessEqual(len(remaining_shares), 1)
1213 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1215 ss.remote_cancel_lease("si1", secrets(4)[2])
1216 # now the slot should be gone
1217 no_shares = read("si1", [], [(0,10)])
1218 self.failUnlessEqual(no_shares, {})
1220 # cancelling a lease on a non-existent share should raise an IndexError
1221 self.failUnlessRaises(IndexError,
1222 ss.remote_cancel_lease, "si2", "nonsecret")
1224 def test_remove(self):
1225 ss = self.create("test_remove")
1226 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1228 readv = ss.remote_slot_readv
1229 writev = ss.remote_slot_testv_and_readv_and_writev
1230 secrets = ( self.write_enabler("we1"),
1231 self.renew_secret("we1"),
1232 self.cancel_secret("we1") )
1233 # delete sh0 by setting its size to zero
1234 answer = writev("si1", secrets,
1237 # the answer should mention all the shares that existed before the
1239 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1240 # but a new read should show only sh1 and sh2
1241 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1244 # delete sh1 by setting its size to zero
1245 answer = writev("si1", secrets,
1248 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1249 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1252 # delete sh2 by setting its size to zero
1253 answer = writev("si1", secrets,
1256 self.failUnlessEqual(answer, (True, {2:[]}) )
1257 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1259 # and the bucket directory should now be gone
1260 si = base32.b2a("si1")
1261 # note: this is a detail of the storage server implementation, and
1262 # may change in the future
1264 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1265 bucketdir = os.path.join(prefixdir, si)
1266 self.failUnless(os.path.exists(prefixdir))
1267 self.failIf(os.path.exists(bucketdir))
1269 class Stats(unittest.TestCase):
1272 self.sparent = LoggingServiceParent()
1273 self._lease_secret = itertools.count()
1275 return self.sparent.stopService()
1277 def workdir(self, name):
1278 basedir = os.path.join("storage", "Server", name)
1281 def create(self, name):
1282 workdir = self.workdir(name)
1283 ss = StorageServer(workdir, "\x00" * 20)
1284 ss.setServiceParent(self.sparent)
1287 def test_latencies(self):
1288 ss = self.create("test_latencies")
1289 for i in range(10000):
1290 ss.add_latency("allocate", 1.0 * i)
1291 for i in range(1000):
1292 ss.add_latency("renew", 1.0 * i)
1294 ss.add_latency("cancel", 2.0 * i)
1295 ss.add_latency("get", 5.0)
1297 output = ss.get_latencies()
1299 self.failUnlessEqual(sorted(output.keys()),
1300 sorted(["allocate", "renew", "cancel", "get"]))
1301 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1302 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1303 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1304 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1305 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1306 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1307 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1308 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1309 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1311 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1312 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1313 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1314 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1315 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1316 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1317 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1318 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1319 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1321 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1322 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1323 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1324 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1325 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1326 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1327 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1328 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1329 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1331 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1332 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1333 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1334 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1335 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1336 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1337 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1338 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1339 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1342 s = re.sub(r'<[^>]*>', ' ', s)
1343 s = re.sub(r'\s+', ' ', s)
1346 class MyBucketCountingCrawler(BucketCountingCrawler):
1347 def finished_prefix(self, cycle, prefix):
1348 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1350 d = self.hook_ds.pop(0)
1353 class MyStorageServer(StorageServer):
1354 def add_bucket_counter(self):
1355 statefile = os.path.join(self.storedir, "bucket_counter.state")
1356 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1357 self.bucket_counter.setServiceParent(self)
1359 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1362 self.s = service.MultiService()
1363 self.s.startService()
1365 return self.s.stopService()
1367 def test_bucket_counter(self):
1368 basedir = "storage/BucketCounter/bucket_counter"
1369 fileutil.make_dirs(basedir)
1370 ss = StorageServer(basedir, "\x00" * 20)
1371 # to make sure we capture the bucket-counting-crawler in the middle
1372 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1373 # also make it start sooner than usual.
1374 ss.bucket_counter.slow_start = 0
1375 orig_cpu_slice = ss.bucket_counter.cpu_slice
1376 ss.bucket_counter.cpu_slice = 0
1377 ss.setServiceParent(self.s)
1379 w = StorageStatus(ss)
1381 # this sample is before the crawler has started doing anything
1382 html = w.renderSynchronously()
1383 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1384 s = remove_tags(html)
1385 self.failUnless("Accepting new shares: Yes" in s, s)
1386 self.failUnless("Reserved space: - 0 B (0)" in s, s)
1387 self.failUnless("Total buckets: Not computed yet" in s, s)
1388 self.failUnless("Next crawl in" in s, s)
1390 # give the bucket-counting-crawler one tick to get started. The
1391 # cpu_slice=0 will force it to yield right after it processes the
1394 d = eventual.fireEventually()
1395 def _check(ignored):
1396 # are we really right after the first prefix?
1397 state = ss.bucket_counter.get_state()
1398 self.failUnlessEqual(state["last-complete-prefix"],
1399 ss.bucket_counter.prefixes[0])
1400 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1401 html = w.renderSynchronously()
1402 s = remove_tags(html)
1403 self.failUnless(" Current crawl " in s, s)
1404 self.failUnless(" (next work in " in s, s)
1405 d.addCallback(_check)
1407 # now give it enough time to complete a full cycle
1409 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1410 d.addCallback(lambda ignored: self.poll(_watch))
1411 def _check2(ignored):
1412 ss.bucket_counter.cpu_slice = orig_cpu_slice
1413 html = w.renderSynchronously()
1414 s = remove_tags(html)
1415 self.failUnless("Total buckets: 0 (the number of" in s, s)
1416 self.failUnless("Next crawl in 59 minutes" in s, s)
1417 d.addCallback(_check2)
1420 def test_bucket_counter_cleanup(self):
1421 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1422 fileutil.make_dirs(basedir)
1423 ss = StorageServer(basedir, "\x00" * 20)
1424 # to make sure we capture the bucket-counting-crawler in the middle
1425 # of a cycle, we reach in and reduce its maximum slice time to 0.
1426 ss.bucket_counter.slow_start = 0
1427 orig_cpu_slice = ss.bucket_counter.cpu_slice
1428 ss.bucket_counter.cpu_slice = 0
1429 ss.setServiceParent(self.s)
1431 d = eventual.fireEventually()
1433 def _after_first_prefix(ignored):
1434 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1435 # now sneak in and mess with its state, to make sure it cleans up
1436 # properly at the end of the cycle
1437 state = ss.bucket_counter.state
1438 self.failUnlessEqual(state["last-complete-prefix"],
1439 ss.bucket_counter.prefixes[0])
1440 state["bucket-counts"][-12] = {}
1441 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1442 ss.bucket_counter.save_state()
1443 d.addCallback(_after_first_prefix)
1445 # now give it enough time to complete a cycle
1447 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1448 d.addCallback(lambda ignored: self.poll(_watch))
1449 def _check2(ignored):
1450 ss.bucket_counter.cpu_slice = orig_cpu_slice
1451 s = ss.bucket_counter.get_state()
1452 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1453 self.failIf("bogusprefix!" in s["storage-index-samples"],
1454 s["storage-index-samples"].keys())
1455 d.addCallback(_check2)
1458 def test_bucket_counter_eta(self):
1459 basedir = "storage/BucketCounter/bucket_counter_eta"
1460 fileutil.make_dirs(basedir)
1461 ss = MyStorageServer(basedir, "\x00" * 20)
1462 ss.bucket_counter.slow_start = 0
1463 # these will be fired inside finished_prefix()
1464 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1465 w = StorageStatus(ss)
1467 d = defer.Deferred()
1469 def _check_1(ignored):
1470 # no ETA is available yet
1471 html = w.renderSynchronously()
1472 s = remove_tags(html)
1473 self.failUnlessIn("complete (next work", s)
1475 def _check_2(ignored):
1476 # one prefix has finished, so an ETA based upon that elapsed time
1477 # should be available.
1478 html = w.renderSynchronously()
1479 s = remove_tags(html)
1480 self.failUnlessIn("complete (ETA ", s)
1482 def _check_3(ignored):
1483 # two prefixes have finished
1484 html = w.renderSynchronously()
1485 s = remove_tags(html)
1486 self.failUnlessIn("complete (ETA ", s)
1489 hooks[0].addCallback(_check_1).addErrback(d.errback)
1490 hooks[1].addCallback(_check_2).addErrback(d.errback)
1491 hooks[2].addCallback(_check_3).addErrback(d.errback)
1493 ss.setServiceParent(self.s)
1496 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1497 stop_after_first_bucket = False
1498 def process_bucket(self, *args, **kwargs):
1499 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1500 if self.stop_after_first_bucket:
1501 self.stop_after_first_bucket = False
1502 self.cpu_slice = -1.0
1503 def yielding(self, sleep_time):
1504 if not self.stop_after_first_bucket:
1505 self.cpu_slice = 500
1507 class BrokenStatResults:
1509 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1512 bsr = BrokenStatResults()
1513 for attrname in dir(s):
1514 if attrname.startswith("_"):
1516 if attrname == "st_blocks":
1518 setattr(bsr, attrname, getattr(s, attrname))
1521 class InstrumentedStorageServer(StorageServer):
1522 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1523 class No_ST_BLOCKS_StorageServer(StorageServer):
1524 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1526 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1529 self.s = service.MultiService()
1530 self.s.startService()
1532 return self.s.stopService()
1534 def make_shares(self, ss):
1536 return (si, hashutil.tagged_hash("renew", si),
1537 hashutil.tagged_hash("cancel", si))
1538 def make_mutable(si):
1539 return (si, hashutil.tagged_hash("renew", si),
1540 hashutil.tagged_hash("cancel", si),
1541 hashutil.tagged_hash("write-enabler", si))
1542 def make_extra_lease(si, num):
1543 return (hashutil.tagged_hash("renew-%d" % num, si),
1544 hashutil.tagged_hash("cancel-%d" % num, si))
1546 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1547 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1548 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1549 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1550 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1551 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1553 canary = FakeCanary()
1554 # note: 'tahoe debug dump-share' will not handle this file, since the
1555 # inner contents are not a valid CHK share
1556 data = "\xff" * 1000
1558 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1560 w[0].remote_write(0, data)
1563 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1565 w[0].remote_write(0, data)
1567 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1569 writev = ss.remote_slot_testv_and_readv_and_writev
1570 writev(mutable_si_2, (we2, rs2, cs2),
1571 {0: ([], [(0,data)], len(data))}, [])
1572 writev(mutable_si_3, (we3, rs3, cs3),
1573 {0: ([], [(0,data)], len(data))}, [])
1574 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1576 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1577 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1578 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1580 def test_basic(self):
1581 basedir = "storage/LeaseCrawler/basic"
1582 fileutil.make_dirs(basedir)
1583 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1584 # make it start sooner than usual.
1585 lc = ss.lease_checker
1588 lc.stop_after_first_bucket = True
1589 webstatus = StorageStatus(ss)
1591 # create a few shares, with some leases on them
1592 self.make_shares(ss)
1593 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1595 # add a non-sharefile to exercise another code path
1596 fn = os.path.join(ss.sharedir,
1597 storage_index_to_dir(immutable_si_0),
1600 f.write("I am not a share.\n")
1603 # this is before the crawl has started, so we're not in a cycle yet
1604 initial_state = lc.get_state()
1605 self.failIf(lc.get_progress()["cycle-in-progress"])
1606 self.failIf("cycle-to-date" in initial_state)
1607 self.failIf("estimated-remaining-cycle" in initial_state)
1608 self.failIf("estimated-current-cycle" in initial_state)
1609 self.failUnless("history" in initial_state)
1610 self.failUnlessEqual(initial_state["history"], {})
1612 ss.setServiceParent(self.s)
1616 d = eventual.fireEventually()
1618 # now examine the state right after the first bucket has been
1620 def _after_first_bucket(ignored):
1621 initial_state = lc.get_state()
1622 self.failUnless("cycle-to-date" in initial_state)
1623 self.failUnless("estimated-remaining-cycle" in initial_state)
1624 self.failUnless("estimated-current-cycle" in initial_state)
1625 self.failUnless("history" in initial_state)
1626 self.failUnlessEqual(initial_state["history"], {})
1628 so_far = initial_state["cycle-to-date"]
1629 self.failUnlessEqual(so_far["expiration-enabled"], False)
1630 self.failUnless("configured-expiration-mode" in so_far)
1631 self.failUnless("lease-age-histogram" in so_far)
1632 lah = so_far["lease-age-histogram"]
1633 self.failUnlessEqual(type(lah), list)
1634 self.failUnlessEqual(len(lah), 1)
1635 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1636 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1637 self.failUnlessEqual(so_far["corrupt-shares"], [])
1638 sr1 = so_far["space-recovered"]
1639 self.failUnlessEqual(sr1["examined-buckets"], 1)
1640 self.failUnlessEqual(sr1["examined-shares"], 1)
1641 self.failUnlessEqual(sr1["actual-shares"], 0)
1642 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1643 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1644 left = initial_state["estimated-remaining-cycle"]
1645 sr2 = left["space-recovered"]
1646 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1647 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1648 self.failIfEqual(sr2["actual-shares"], None)
1649 self.failIfEqual(sr2["configured-diskbytes"], None)
1650 self.failIfEqual(sr2["original-sharebytes"], None)
1651 d.addCallback(_after_first_bucket)
1652 d.addCallback(lambda ign: self.render1(webstatus))
1653 def _check_html_in_cycle(html):
1654 s = remove_tags(html)
1655 self.failUnlessIn("So far, this cycle has examined "
1656 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1657 self.failUnlessIn("and has recovered: "
1658 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1659 "0 B (0 B / 0 B)", s)
1660 self.failUnlessIn("If expiration were enabled, "
1661 "we would have recovered: "
1662 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1663 " 0 B (0 B / 0 B) by now", s)
1664 self.failUnlessIn("and the remainder of this cycle "
1665 "would probably recover: "
1666 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1667 " 0 B (0 B / 0 B)", s)
1668 self.failUnlessIn("and the whole cycle would probably recover: "
1669 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1670 " 0 B (0 B / 0 B)", s)
1671 self.failUnlessIn("if we were using each lease's default "
1672 "31-day lease lifetime", s)
1673 self.failUnlessIn("this cycle would be expected to recover: ", s)
1674 d.addCallback(_check_html_in_cycle)
1676 # wait for the crawler to finish the first cycle. Nothing should have
1679 return bool(lc.get_state()["last-cycle-finished"] is not None)
1680 d.addCallback(lambda ign: self.poll(_wait))
1682 def _after_first_cycle(ignored):
1684 self.failIf("cycle-to-date" in s)
1685 self.failIf("estimated-remaining-cycle" in s)
1686 self.failIf("estimated-current-cycle" in s)
1687 last = s["history"][0]
1688 self.failUnless("cycle-start-finish-times" in last)
1689 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1690 self.failUnlessEqual(last["expiration-enabled"], False)
1691 self.failUnless("configured-expiration-mode" in last)
1693 self.failUnless("lease-age-histogram" in last)
1694 lah = last["lease-age-histogram"]
1695 self.failUnlessEqual(type(lah), list)
1696 self.failUnlessEqual(len(lah), 1)
1697 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1699 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1700 self.failUnlessEqual(last["corrupt-shares"], [])
1702 rec = last["space-recovered"]
1703 self.failUnlessEqual(rec["examined-buckets"], 4)
1704 self.failUnlessEqual(rec["examined-shares"], 4)
1705 self.failUnlessEqual(rec["actual-buckets"], 0)
1706 self.failUnlessEqual(rec["original-buckets"], 0)
1707 self.failUnlessEqual(rec["configured-buckets"], 0)
1708 self.failUnlessEqual(rec["actual-shares"], 0)
1709 self.failUnlessEqual(rec["original-shares"], 0)
1710 self.failUnlessEqual(rec["configured-shares"], 0)
1711 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1712 self.failUnlessEqual(rec["original-diskbytes"], 0)
1713 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1714 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1715 self.failUnlessEqual(rec["original-sharebytes"], 0)
1716 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1718 def _get_sharefile(si):
1719 return list(ss._iter_share_files(si))[0]
1720 def count_leases(si):
1721 return len(list(_get_sharefile(si).get_leases()))
1722 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1723 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1724 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1725 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1726 d.addCallback(_after_first_cycle)
1727 d.addCallback(lambda ign: self.render1(webstatus))
1728 def _check_html(html):
1729 s = remove_tags(html)
1730 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1731 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
1732 "but expiration was not enabled", s)
1733 d.addCallback(_check_html)
1736 def backdate_lease(self, sf, renew_secret, new_expire_time):
1737 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1738 # "renew" a lease with a new_expire_time that is older than what the
1739 # current lease has), so we have to reach inside it.
1740 for i,lease in enumerate(sf.get_leases()):
1741 if lease.renew_secret == renew_secret:
1742 lease.expiration_time = new_expire_time
1743 f = open(sf.home, 'rb+')
1744 sf._write_lease_record(f, i, lease)
1747 raise IndexError("unable to renew non-existent lease")
1749 def test_expire_age(self):
1750 basedir = "storage/LeaseCrawler/expire_age"
1751 fileutil.make_dirs(basedir)
1752 # setting expiration_time to 2000 means that any lease which is more
1753 # than 2000s old will be expired.
1754 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1755 expiration_enabled=True,
1756 expiration_mode=("age",2000))
1757 # make it start sooner than usual.
1758 lc = ss.lease_checker
1760 lc.stop_after_first_bucket = True
1761 webstatus = StorageStatus(ss)
1763 # create a few shares, with some leases on them
1764 self.make_shares(ss)
1765 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1767 def count_shares(si):
1768 return len(list(ss._iter_share_files(si)))
1769 def _get_sharefile(si):
1770 return list(ss._iter_share_files(si))[0]
1771 def count_leases(si):
1772 return len(list(_get_sharefile(si).get_leases()))
1774 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1775 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1776 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1777 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1778 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1779 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1780 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1781 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1783 # artificially crank back the expiration time on the first lease of
1784 # each share, to make it look like it expired already (age=1000s).
1785 # Some shares have an extra lease which is set to expire at the
1786 # default time in 31 days from now (age=31days). We then run the
1787 # crawler, which will expire the first lease, making some shares get
1788 # deleted and others stay alive (with one remaining lease)
1791 sf0 = _get_sharefile(immutable_si_0)
1792 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1793 sf0_size = os.stat(sf0.home).st_size
1795 # immutable_si_1 gets an extra lease
1796 sf1 = _get_sharefile(immutable_si_1)
1797 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1799 sf2 = _get_sharefile(mutable_si_2)
1800 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1801 sf2_size = os.stat(sf2.home).st_size
1803 # mutable_si_3 gets an extra lease
1804 sf3 = _get_sharefile(mutable_si_3)
1805 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1807 ss.setServiceParent(self.s)
1809 d = eventual.fireEventually()
1810 # examine the state right after the first bucket has been processed
1811 def _after_first_bucket(ignored):
1812 p = lc.get_progress()
1813 self.failUnless(p["cycle-in-progress"])
1814 d.addCallback(_after_first_bucket)
1815 d.addCallback(lambda ign: self.render1(webstatus))
1816 def _check_html_in_cycle(html):
1817 s = remove_tags(html)
1818 # the first bucket encountered gets deleted, and its prefix
1819 # happens to be about 1/5th of the way through the ring, so the
1820 # predictor thinks we'll have 5 shares and that we'll delete them
1821 # all. This part of the test depends upon the SIs landing right
1822 # where they do now.
1823 self.failUnlessIn("The remainder of this cycle is expected to "
1824 "recover: 4 shares, 4 buckets", s)
1825 self.failUnlessIn("The whole cycle is expected to examine "
1826 "5 shares in 5 buckets and to recover: "
1827 "5 shares, 5 buckets", s)
1828 d.addCallback(_check_html_in_cycle)
1830 # wait for the crawler to finish the first cycle. Two shares should
1833 return bool(lc.get_state()["last-cycle-finished"] is not None)
1834 d.addCallback(lambda ign: self.poll(_wait))
1836 def _after_first_cycle(ignored):
1837 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1838 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1839 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1840 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1841 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1842 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1845 last = s["history"][0]
1847 self.failUnlessEqual(last["expiration-enabled"], True)
1848 self.failUnlessEqual(last["configured-expiration-mode"], ("age",2000))
1849 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1851 rec = last["space-recovered"]
1852 self.failUnlessEqual(rec["examined-buckets"], 4)
1853 self.failUnlessEqual(rec["examined-shares"], 4)
1854 self.failUnlessEqual(rec["actual-buckets"], 2)
1855 self.failUnlessEqual(rec["original-buckets"], 2)
1856 self.failUnlessEqual(rec["configured-buckets"], 2)
1857 self.failUnlessEqual(rec["actual-shares"], 2)
1858 self.failUnlessEqual(rec["original-shares"], 2)
1859 self.failUnlessEqual(rec["configured-shares"], 2)
1860 size = sf0_size + sf2_size
1861 self.failUnlessEqual(rec["actual-sharebytes"], size)
1862 self.failUnlessEqual(rec["original-sharebytes"], size)
1863 self.failUnlessEqual(rec["configured-sharebytes"], size)
1864 # different platforms have different notions of "blocks used by
1865 # this file", so merely assert that it's a number
1866 self.failUnless(rec["actual-diskbytes"] >= 0,
1867 rec["actual-diskbytes"])
1868 self.failUnless(rec["original-diskbytes"] >= 0,
1869 rec["original-diskbytes"])
1870 self.failUnless(rec["configured-diskbytes"] >= 0,
1871 rec["configured-diskbytes"])
1872 d.addCallback(_after_first_cycle)
1873 d.addCallback(lambda ign: self.render1(webstatus))
1874 def _check_html(html):
1875 s = remove_tags(html)
1876 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1877 self.failUnlessIn("leases created or last renewed more than 33 minutes ago will be considered expired", s)
1878 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1879 d.addCallback(_check_html)
1882 def test_expire_date_cutoff(self):
1883 basedir = "storage/LeaseCrawler/expire_date_cutoff"
1884 fileutil.make_dirs(basedir)
1885 # setting date-cutoff to 2000 seconds ago means that any lease which
1886 # is more than 2000s old will be expired.
1888 then = int(now - 2000)
1889 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1890 expiration_enabled=True,
1891 expiration_mode=("date-cutoff",then))
1892 # make it start sooner than usual.
1893 lc = ss.lease_checker
1895 lc.stop_after_first_bucket = True
1896 webstatus = StorageStatus(ss)
1898 # create a few shares, with some leases on them
1899 self.make_shares(ss)
1900 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1902 def count_shares(si):
1903 return len(list(ss._iter_share_files(si)))
1904 def _get_sharefile(si):
1905 return list(ss._iter_share_files(si))[0]
1906 def count_leases(si):
1907 return len(list(_get_sharefile(si).get_leases()))
1909 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1910 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1911 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1912 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1913 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1914 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1915 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1916 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1918 # artificially crank back the expiration time on the first lease of
1919 # each share, to make it look like was renewed 3000s ago. To achieve
1920 # this, we need to set the expiration time to now-3000+31days. This
1921 # will change when the lease format is improved to contain both
1922 # create/renew time and duration.
1923 new_expiration_time = now - 3000 + 31*24*60*60
1925 # Some shares have an extra lease which is set to expire at the
1926 # default time in 31 days from now (age=31days). We then run the
1927 # crawler, which will expire the first lease, making some shares get
1928 # deleted and others stay alive (with one remaining lease)
1930 sf0 = _get_sharefile(immutable_si_0)
1931 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1932 sf0_size = os.stat(sf0.home).st_size
1934 # immutable_si_1 gets an extra lease
1935 sf1 = _get_sharefile(immutable_si_1)
1936 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1938 sf2 = _get_sharefile(mutable_si_2)
1939 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1940 sf2_size = os.stat(sf2.home).st_size
1942 # mutable_si_3 gets an extra lease
1943 sf3 = _get_sharefile(mutable_si_3)
1944 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1946 ss.setServiceParent(self.s)
1948 d = eventual.fireEventually()
1949 # examine the state right after the first bucket has been processed
1950 def _after_first_bucket(ignored):
1951 p = lc.get_progress()
1952 self.failUnless(p["cycle-in-progress"])
1953 d.addCallback(_after_first_bucket)
1954 d.addCallback(lambda ign: self.render1(webstatus))
1955 def _check_html_in_cycle(html):
1956 s = remove_tags(html)
1957 # the first bucket encountered gets deleted, and its prefix
1958 # happens to be about 1/5th of the way through the ring, so the
1959 # predictor thinks we'll have 5 shares and that we'll delete them
1960 # all. This part of the test depends upon the SIs landing right
1961 # where they do now.
1962 self.failUnlessIn("The remainder of this cycle is expected to "
1963 "recover: 4 shares, 4 buckets", s)
1964 self.failUnlessIn("The whole cycle is expected to examine "
1965 "5 shares in 5 buckets and to recover: "
1966 "5 shares, 5 buckets", s)
1967 d.addCallback(_check_html_in_cycle)
1969 # wait for the crawler to finish the first cycle. Two shares should
1972 return bool(lc.get_state()["last-cycle-finished"] is not None)
1973 d.addCallback(lambda ign: self.poll(_wait))
1975 def _after_first_cycle(ignored):
1976 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1977 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1978 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1979 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1980 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1981 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1984 last = s["history"][0]
1986 self.failUnlessEqual(last["expiration-enabled"], True)
1987 self.failUnlessEqual(last["configured-expiration-mode"],
1988 ("date-cutoff",then))
1989 self.failUnlessEqual(last["leases-per-share-histogram"],
1992 rec = last["space-recovered"]
1993 self.failUnlessEqual(rec["examined-buckets"], 4)
1994 self.failUnlessEqual(rec["examined-shares"], 4)
1995 self.failUnlessEqual(rec["actual-buckets"], 2)
1996 self.failUnlessEqual(rec["original-buckets"], 0)
1997 self.failUnlessEqual(rec["configured-buckets"], 2)
1998 self.failUnlessEqual(rec["actual-shares"], 2)
1999 self.failUnlessEqual(rec["original-shares"], 0)
2000 self.failUnlessEqual(rec["configured-shares"], 2)
2001 size = sf0_size + sf2_size
2002 self.failUnlessEqual(rec["actual-sharebytes"], size)
2003 self.failUnlessEqual(rec["original-sharebytes"], 0)
2004 self.failUnlessEqual(rec["configured-sharebytes"], size)
2005 # different platforms have different notions of "blocks used by
2006 # this file", so merely assert that it's a number
2007 self.failUnless(rec["actual-diskbytes"] >= 0,
2008 rec["actual-diskbytes"])
2009 self.failUnless(rec["original-diskbytes"] >= 0,
2010 rec["original-diskbytes"])
2011 self.failUnless(rec["configured-diskbytes"] >= 0,
2012 rec["configured-diskbytes"])
2013 d.addCallback(_after_first_cycle)
2014 d.addCallback(lambda ign: self.render1(webstatus))
2015 def _check_html(html):
2016 s = remove_tags(html)
2017 self.failUnlessIn("Expiration Enabled:"
2018 " expired leases will be removed", s)
2019 date = time.strftime("%d-%b-%Y", time.gmtime(then))
2020 self.failUnlessIn("leases created or last renewed before %s"
2021 " will be considered expired" % date, s)
2022 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2023 d.addCallback(_check_html)
2026 def test_only_immutable(self):
2027 basedir = "storage/LeaseCrawler/only_immutable"
2028 fileutil.make_dirs(basedir)
2030 then = int(now - 2000)
2031 ss = StorageServer(basedir, "\x00" * 20,
2032 expiration_enabled=True,
2033 expiration_mode=("date-cutoff",
2034 then, ("immutable",)))
2035 lc = ss.lease_checker
2037 webstatus = StorageStatus(ss)
2039 self.make_shares(ss)
2040 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2041 # set all leases to be expirable
2042 new_expiration_time = now - 3000 + 31*24*60*60
2044 def count_shares(si):
2045 return len(list(ss._iter_share_files(si)))
2046 def _get_sharefile(si):
2047 return list(ss._iter_share_files(si))[0]
2048 def count_leases(si):
2049 return len(list(_get_sharefile(si).get_leases()))
2051 sf0 = _get_sharefile(immutable_si_0)
2052 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2053 sf1 = _get_sharefile(immutable_si_1)
2054 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2055 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2056 sf2 = _get_sharefile(mutable_si_2)
2057 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2058 sf3 = _get_sharefile(mutable_si_3)
2059 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2060 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2062 ss.setServiceParent(self.s)
2064 return bool(lc.get_state()["last-cycle-finished"] is not None)
2065 d = self.poll(_wait)
2067 def _after_first_cycle(ignored):
2068 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2069 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2070 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2071 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2072 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2073 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2074 d.addCallback(_after_first_cycle)
2075 d.addCallback(lambda ign: self.render1(webstatus))
2076 def _check_html(html):
2077 s = remove_tags(html)
2078 self.failUnlessIn("only the following sharetypes will be expired: immutable Next crawl", s)
2079 d.addCallback(_check_html)
2082 def test_only_mutable(self):
2083 basedir = "storage/LeaseCrawler/only_mutable"
2084 fileutil.make_dirs(basedir)
2086 then = int(now - 2000)
2087 ss = StorageServer(basedir, "\x00" * 20,
2088 expiration_enabled=True,
2089 expiration_mode=("date-cutoff",
2090 then, ("mutable",)))
2091 lc = ss.lease_checker
2093 webstatus = StorageStatus(ss)
2095 self.make_shares(ss)
2096 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2097 # set all leases to be expirable
2098 new_expiration_time = now - 3000 + 31*24*60*60
2100 def count_shares(si):
2101 return len(list(ss._iter_share_files(si)))
2102 def _get_sharefile(si):
2103 return list(ss._iter_share_files(si))[0]
2104 def count_leases(si):
2105 return len(list(_get_sharefile(si).get_leases()))
2107 sf0 = _get_sharefile(immutable_si_0)
2108 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2109 sf1 = _get_sharefile(immutable_si_1)
2110 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2111 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2112 sf2 = _get_sharefile(mutable_si_2)
2113 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2114 sf3 = _get_sharefile(mutable_si_3)
2115 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2116 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2118 ss.setServiceParent(self.s)
2120 return bool(lc.get_state()["last-cycle-finished"] is not None)
2121 d = self.poll(_wait)
2123 def _after_first_cycle(ignored):
2124 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2125 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2126 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2127 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2128 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2129 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2130 d.addCallback(_after_first_cycle)
2131 d.addCallback(lambda ign: self.render1(webstatus))
2132 def _check_html(html):
2133 s = remove_tags(html)
2134 self.failUnlessIn("only the following sharetypes will be expired: mutable Next crawl", s)
2135 d.addCallback(_check_html)
2138 def test_bad_mode(self):
2139 basedir = "storage/LeaseCrawler/bad_mode"
2140 fileutil.make_dirs(basedir)
2141 e = self.failUnlessRaises(ValueError,
2142 StorageServer, basedir, "\x00" * 20,
2143 expiration_mode=("bogus", 0))
2144 self.failUnless("garbage-collection mode 'bogus'"
2145 " must be 'age' or 'date-cutoff'" in str(e), str(e))
2147 def test_parse_duration(self):
2151 p = time_format.parse_duration
2152 self.failUnlessEqual(p("7days"), 7*DAY)
2153 self.failUnlessEqual(p("31day"), 31*DAY)
2154 self.failUnlessEqual(p("60 days"), 60*DAY)
2155 self.failUnlessEqual(p("2mo"), 2*MONTH)
2156 self.failUnlessEqual(p("3 month"), 3*MONTH)
2157 self.failUnlessEqual(p("2years"), 2*YEAR)
2158 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2159 self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2162 def test_limited_history(self):
2163 basedir = "storage/LeaseCrawler/limited_history"
2164 fileutil.make_dirs(basedir)
2165 ss = StorageServer(basedir, "\x00" * 20)
2166 # make it start sooner than usual.
2167 lc = ss.lease_checker
2171 # create a few shares, with some leases on them
2172 self.make_shares(ss)
2174 ss.setServiceParent(self.s)
2176 def _wait_until_15_cycles_done():
2177 last = lc.state["last-cycle-finished"]
2178 if last is not None and last >= 15:
2183 d = self.poll(_wait_until_15_cycles_done)
2185 def _check(ignored):
2188 self.failUnlessEqual(len(h), 10)
2189 self.failUnlessEqual(max(h.keys()), 15)
2190 self.failUnlessEqual(min(h.keys()), 6)
2191 d.addCallback(_check)
2194 def test_unpredictable_future(self):
2195 basedir = "storage/LeaseCrawler/unpredictable_future"
2196 fileutil.make_dirs(basedir)
2197 ss = StorageServer(basedir, "\x00" * 20)
2198 # make it start sooner than usual.
2199 lc = ss.lease_checker
2201 lc.cpu_slice = -1.0 # stop quickly
2203 self.make_shares(ss)
2205 ss.setServiceParent(self.s)
2207 d = eventual.fireEventually()
2208 def _check(ignored):
2209 # this should fire after the first bucket is complete, but before
2210 # the first prefix is complete, so the progress-measurer won't
2211 # think we've gotten far enough to raise our percent-complete
2212 # above 0%, triggering the cannot-predict-the-future code in
2213 # expirer.py . This will have to change if/when the
2214 # progress-measurer gets smart enough to count buckets (we'll
2215 # have to interrupt it even earlier, before it's finished the
2218 self.failUnless("cycle-to-date" in s)
2219 self.failUnless("estimated-remaining-cycle" in s)
2220 self.failUnless("estimated-current-cycle" in s)
2222 left = s["estimated-remaining-cycle"]["space-recovered"]
2223 self.failUnlessEqual(left["actual-buckets"], None)
2224 self.failUnlessEqual(left["original-buckets"], None)
2225 self.failUnlessEqual(left["configured-buckets"], None)
2226 self.failUnlessEqual(left["actual-shares"], None)
2227 self.failUnlessEqual(left["original-shares"], None)
2228 self.failUnlessEqual(left["configured-shares"], None)
2229 self.failUnlessEqual(left["actual-diskbytes"], None)
2230 self.failUnlessEqual(left["original-diskbytes"], None)
2231 self.failUnlessEqual(left["configured-diskbytes"], None)
2232 self.failUnlessEqual(left["actual-sharebytes"], None)
2233 self.failUnlessEqual(left["original-sharebytes"], None)
2234 self.failUnlessEqual(left["configured-sharebytes"], None)
2236 full = s["estimated-remaining-cycle"]["space-recovered"]
2237 self.failUnlessEqual(full["actual-buckets"], None)
2238 self.failUnlessEqual(full["original-buckets"], None)
2239 self.failUnlessEqual(full["configured-buckets"], None)
2240 self.failUnlessEqual(full["actual-shares"], None)
2241 self.failUnlessEqual(full["original-shares"], None)
2242 self.failUnlessEqual(full["configured-shares"], None)
2243 self.failUnlessEqual(full["actual-diskbytes"], None)
2244 self.failUnlessEqual(full["original-diskbytes"], None)
2245 self.failUnlessEqual(full["configured-diskbytes"], None)
2246 self.failUnlessEqual(full["actual-sharebytes"], None)
2247 self.failUnlessEqual(full["original-sharebytes"], None)
2248 self.failUnlessEqual(full["configured-sharebytes"], None)
2250 d.addCallback(_check)
2253 def test_no_st_blocks(self):
2254 basedir = "storage/LeaseCrawler/no_st_blocks"
2255 fileutil.make_dirs(basedir)
2256 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2257 expiration_mode=("age",-1000))
2258 # a negative expiration_time= means the "configured-"
2259 # space-recovered counts will be non-zero, since all shares will have
2262 # make it start sooner than usual.
2263 lc = ss.lease_checker
2266 self.make_shares(ss)
2267 ss.setServiceParent(self.s)
2269 return bool(lc.get_state()["last-cycle-finished"] is not None)
2270 d = self.poll(_wait)
2272 def _check(ignored):
2274 last = s["history"][0]
2275 rec = last["space-recovered"]
2276 self.failUnlessEqual(rec["configured-buckets"], 4)
2277 self.failUnlessEqual(rec["configured-shares"], 4)
2278 self.failUnless(rec["configured-sharebytes"] > 0,
2279 rec["configured-sharebytes"])
2280 # without the .st_blocks field in os.stat() results, we should be
2281 # reporting diskbytes==sharebytes
2282 self.failUnlessEqual(rec["configured-sharebytes"],
2283 rec["configured-diskbytes"])
2284 d.addCallback(_check)
2287 def test_share_corruption(self):
2288 basedir = "storage/LeaseCrawler/share_corruption"
2289 fileutil.make_dirs(basedir)
2290 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2291 w = StorageStatus(ss)
2292 # make it start sooner than usual.
2293 lc = ss.lease_checker
2294 lc.stop_after_first_bucket = True
2298 # create a few shares, with some leases on them
2299 self.make_shares(ss)
2301 # now corrupt one, and make sure the lease-checker keeps going
2302 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2303 first = min(self.sis)
2304 first_b32 = base32.b2a(first)
2305 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2308 f.write("BAD MAGIC")
2310 # if get_share_file() doesn't see the correct mutable magic, it
2311 # assumes the file is an immutable share, and then
2312 # immutable.ShareFile sees a bad version. So regardless of which kind
2313 # of share we corrupted, this will trigger an
2314 # UnknownImmutableContainerVersionError.
2316 ss.setServiceParent(self.s)
2318 d = eventual.fireEventually()
2320 # now examine the state right after the first bucket has been
2322 def _after_first_bucket(ignored):
2323 so_far = lc.get_state()["cycle-to-date"]
2324 rec = so_far["space-recovered"]
2325 self.failUnlessEqual(rec["examined-buckets"], 1)
2326 self.failUnlessEqual(rec["examined-shares"], 0)
2327 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2328 d.addCallback(_after_first_bucket)
2330 d.addCallback(lambda ign: self.render_json(w))
2331 def _check_json(json):
2332 data = simplejson.loads(json)
2333 # grr. json turns all dict keys into strings.
2334 so_far = data["lease-checker"]["cycle-to-date"]
2335 corrupt_shares = so_far["corrupt-shares"]
2336 # it also turns all tuples into lists
2337 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2338 d.addCallback(_check_json)
2339 d.addCallback(lambda ign: self.render1(w))
2340 def _check_html(html):
2341 s = remove_tags(html)
2342 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2343 d.addCallback(_check_html)
2346 return bool(lc.get_state()["last-cycle-finished"] is not None)
2347 d.addCallback(lambda ign: self.poll(_wait))
2349 def _after_first_cycle(ignored):
2351 last = s["history"][0]
2352 rec = last["space-recovered"]
2353 self.failUnlessEqual(rec["examined-buckets"], 4)
2354 self.failUnlessEqual(rec["examined-shares"], 3)
2355 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2356 d.addCallback(_after_first_cycle)
2357 d.addCallback(lambda ign: self.render_json(w))
2358 def _check_json_history(json):
2359 data = simplejson.loads(json)
2360 last = data["lease-checker"]["history"]["0"]
2361 corrupt_shares = last["corrupt-shares"]
2362 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2363 d.addCallback(_check_json_history)
2364 d.addCallback(lambda ign: self.render1(w))
2365 def _check_html_history(html):
2366 s = remove_tags(html)
2367 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2368 d.addCallback(_check_html_history)
2371 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2372 UnknownImmutableContainerVersionError)
2377 def render_json(self, page):
2378 d = self.render1(page, args={"t": ["json"]})
2381 class NoStatvfsServer(StorageServer):
2382 def do_statvfs(self):
2383 raise AttributeError
2385 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2388 self.s = service.MultiService()
2389 self.s.startService()
2391 return self.s.stopService()
2393 def test_no_server(self):
2394 w = StorageStatus(None)
2395 html = w.renderSynchronously()
2396 self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2398 def test_status(self):
2399 basedir = "storage/WebStatus/status"
2400 fileutil.make_dirs(basedir)
2401 ss = StorageServer(basedir, "\x00" * 20)
2402 ss.setServiceParent(self.s)
2403 w = StorageStatus(ss)
2405 def _check_html(html):
2406 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2407 s = remove_tags(html)
2408 self.failUnless("Accepting new shares: Yes" in s, s)
2409 self.failUnless("Reserved space: - 0 B (0)" in s, s)
2410 d.addCallback(_check_html)
2411 d.addCallback(lambda ign: self.render_json(w))
2412 def _check_json(json):
2413 data = simplejson.loads(json)
2415 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2416 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2417 self.failUnless("bucket-counter" in data)
2418 self.failUnless("lease-checker" in data)
2419 d.addCallback(_check_json)
2422 def render_json(self, page):
2423 d = self.render1(page, args={"t": ["json"]})
2426 def test_status_no_statvfs(self):
2427 # windows has no os.statvfs . Make sure the code handles that even on
2429 basedir = "storage/WebStatus/status_no_statvfs"
2430 fileutil.make_dirs(basedir)
2431 ss = NoStatvfsServer(basedir, "\x00" * 20)
2432 ss.setServiceParent(self.s)
2433 w = StorageStatus(ss)
2434 html = w.renderSynchronously()
2435 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2436 s = remove_tags(html)
2437 self.failUnless("Accepting new shares: Yes" in s, s)
2438 self.failUnless("Total disk space: ?" in s, s)
2440 def test_readonly(self):
2441 basedir = "storage/WebStatus/readonly"
2442 fileutil.make_dirs(basedir)
2443 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2444 ss.setServiceParent(self.s)
2445 w = StorageStatus(ss)
2446 html = w.renderSynchronously()
2447 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2448 s = remove_tags(html)
2449 self.failUnless("Accepting new shares: No" in s, s)
2451 def test_reserved(self):
2452 basedir = "storage/WebStatus/reserved"
2453 fileutil.make_dirs(basedir)
2454 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2455 ss.setServiceParent(self.s)
2456 w = StorageStatus(ss)
2457 html = w.renderSynchronously()
2458 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2459 s = remove_tags(html)
2460 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2462 def test_huge_reserved(self):
2463 basedir = "storage/WebStatus/reserved"
2464 fileutil.make_dirs(basedir)
2465 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2466 ss.setServiceParent(self.s)
2467 w = StorageStatus(ss)
2468 html = w.renderSynchronously()
2469 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2470 s = remove_tags(html)
2471 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2473 def test_util(self):
2474 w = StorageStatus(None)
2475 self.failUnlessEqual(w.render_space(None, None), "?")
2476 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2477 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2478 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2479 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2480 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)