2 import time, os.path, stat, re, simplejson, struct
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
30 def __init__(self, ignore_disconnectors=False):
31 self.ignore = ignore_disconnectors
32 self.disconnectors = {}
33 def notifyOnDisconnect(self, f, *args, **kwargs):
37 self.disconnectors[m] = (f, args, kwargs)
39 def dontNotifyOnDisconnect(self, marker):
42 del self.disconnectors[marker]
44 class FakeStatsProvider:
45 def count(self, name, delta=1):
47 def register_producer(self, producer):
50 class Bucket(unittest.TestCase):
51 def make_workdir(self, name):
52 basedir = os.path.join("storage", "Bucket", name)
53 incoming = os.path.join(basedir, "tmp", "bucket")
54 final = os.path.join(basedir, "bucket")
55 fileutil.make_dirs(basedir)
56 fileutil.make_dirs(os.path.join(basedir, "tmp"))
57 return incoming, final
59 def bucket_writer_closed(self, bw, consumed):
61 def add_latency(self, category, latency):
63 def count(self, name, delta=1):
68 renew_secret = os.urandom(32)
69 cancel_secret = os.urandom(32)
70 expiration_time = time.time() + 5000
71 return LeaseInfo(owner_num, renew_secret, cancel_secret,
72 expiration_time, "\x00" * 20)
74 def test_create(self):
75 incoming, final = self.make_workdir("test_create")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*25)
81 bw.remote_write(75, "d"*7)
84 def test_readwrite(self):
85 incoming, final = self.make_workdir("test_readwrite")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*7) # last block may be short
94 br = BucketReader(self, bw.finalhome)
95 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101 def callRemote(self, methname, *args, **kwargs):
103 meth = getattr(self.target, "remote_" + methname)
104 return meth(*args, **kwargs)
105 return defer.maybeDeferred(_call)
107 class BucketProxy(unittest.TestCase):
108 def make_bucket(self, name, size):
109 basedir = os.path.join("storage", "BucketProxy", name)
110 incoming = os.path.join(basedir, "tmp", "bucket")
111 final = os.path.join(basedir, "bucket")
112 fileutil.make_dirs(basedir)
113 fileutil.make_dirs(os.path.join(basedir, "tmp"))
114 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
120 def make_lease(self):
122 renew_secret = os.urandom(32)
123 cancel_secret = os.urandom(32)
124 expiration_time = time.time() + 5000
125 return LeaseInfo(owner_num, renew_secret, cancel_secret,
126 expiration_time, "\x00" * 20)
128 def bucket_writer_closed(self, bw, consumed):
130 def add_latency(self, category, latency):
132 def count(self, name, delta=1):
135 def test_create(self):
136 bw, rb, sharefname = self.make_bucket("test_create", 500)
137 bp = WriteBucketProxy(rb,
142 uri_extension_size_max=500, nodeid=None)
143 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
145 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146 # Let's pretend each share has 100 bytes of data, and that there are
147 # 4 segments (25 bytes each), and 8 shares total. So the two
148 # per-segment merkle trees (crypttext_hash_tree,
149 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152 # long. That should make the whole share:
154 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165 uri_extension = "s" + "E"*498 + "e"
167 bw, rb, sharefname = self.make_bucket(name, sharesize)
173 uri_extension_size_max=len(uri_extension),
177 d.addCallback(lambda res: bp.put_block(0, "a"*25))
178 d.addCallback(lambda res: bp.put_block(1, "b"*25))
179 d.addCallback(lambda res: bp.put_block(2, "c"*25))
180 d.addCallback(lambda res: bp.put_block(3, "d"*20))
181 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185 d.addCallback(lambda res: bp.close())
187 # now read everything back
188 def _start_reading(res):
189 br = BucketReader(self, sharefname)
192 rbp = rbp_class(rb, peerid="abc", storage_index="")
193 self.failUnlessIn("to peer", repr(rbp))
194 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
196 d1 = rbp.get_block_data(0, 25, 25)
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206 d1.addCallback(lambda res:
207 self.failUnlessEqual(res, crypttext_hashes))
208 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210 d1.addCallback(lambda res: rbp.get_share_hashes())
211 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212 d1.addCallback(lambda res: rbp.get_uri_extension())
213 d1.addCallback(lambda res:
214 self.failUnlessEqual(res, uri_extension))
218 d.addCallback(_start_reading)
222 def test_readwrite_v1(self):
223 return self._do_test_readwrite("test_readwrite_v1",
224 0x24, WriteBucketProxy, ReadBucketProxy)
226 def test_readwrite_v2(self):
227 return self._do_test_readwrite("test_readwrite_v2",
228 0x44, WriteBucketProxy_v2, ReadBucketProxy)
230 class FakeDiskStorageServer(StorageServer):
232 def get_disk_stats(self):
233 return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
235 class Server(unittest.TestCase):
238 self.sparent = LoggingServiceParent()
239 self.sparent.startService()
240 self._lease_secret = itertools.count()
242 return self.sparent.stopService()
244 def workdir(self, name):
245 basedir = os.path.join("storage", "Server", name)
248 def create(self, name, reserved_space=0, klass=StorageServer):
249 workdir = self.workdir(name)
250 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251 stats_provider=FakeStatsProvider())
252 ss.setServiceParent(self.sparent)
255 def test_create(self):
256 self.create("test_create")
258 def allocate(self, ss, storage_index, sharenums, size, canary=None):
259 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
262 canary = FakeCanary()
263 return ss.remote_allocate_buckets(storage_index,
264 renew_secret, cancel_secret,
265 sharenums, size, canary)
267 def test_large_share(self):
268 ss = self.create("test_large_share")
270 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271 self.failUnlessEqual(already, set())
272 self.failUnlessEqual(set(writers.keys()), set([0]))
274 shnum, bucket = writers.items()[0]
275 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
276 bucket.remote_write(2**32, "ab")
277 bucket.remote_close()
279 readers = ss.remote_get_buckets("allocate")
280 reader = readers[shnum]
281 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files)."
284 def test_dont_overfill_dirs(self):
286 This test asserts that if you add a second share whose storage index
287 share lots of leading bits with an extant share (but isn't the exact
288 same storage index), this won't add an entry to the share directory.
290 ss = self.create("test_dont_overfill_dirs")
291 already, writers = self.allocate(ss, "storageindex", [0], 10)
292 for i, wb in writers.items():
293 wb.remote_write(0, "%10d" % i)
295 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
297 children_of_storedir = set(os.listdir(storedir))
299 # Now store another one under another storageindex that has leading
300 # chars the same as the first storageindex.
301 already, writers = self.allocate(ss, "storageindey", [0], 10)
302 for i, wb in writers.items():
303 wb.remote_write(0, "%10d" % i)
305 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
307 new_children_of_storedir = set(os.listdir(storedir))
308 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
310 def test_remove_incoming(self):
311 ss = self.create("test_remove_incoming")
312 already, writers = self.allocate(ss, "vid", range(3), 10)
313 for i,wb in writers.items():
314 wb.remote_write(0, "%10d" % i)
316 incoming_share_dir = wb.incominghome
317 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319 incoming_dir = os.path.dirname(incoming_prefix_dir)
320 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
321 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
322 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
324 def test_abort(self):
325 # remote_abort, when called on a writer, should make sure that
326 # the allocated size of the bucket is not counted by the storage
327 # server when accounting for space.
328 ss = self.create("test_abort")
329 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
330 self.failIfEqual(ss.allocated_size(), 0)
332 # Now abort the writers.
333 for writer in writers.itervalues():
334 writer.remote_abort()
335 self.failUnlessEqual(ss.allocated_size(), 0)
338 def test_allocate(self):
339 ss = self.create("test_allocate")
341 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
343 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
344 self.failUnlessEqual(already, set())
345 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
347 # while the buckets are open, they should not count as readable
348 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
351 for i,wb in writers.items():
352 wb.remote_write(0, "%25d" % i)
354 # aborting a bucket that was already closed is a no-op
357 # now they should be readable
358 b = ss.remote_get_buckets("allocate")
359 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
360 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
362 self.failUnlessIn("BucketReader", b_str)
363 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
365 # now if we ask about writing again, the server should offer those
366 # three buckets as already present. It should offer them even if we
367 # don't ask about those specific ones.
368 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
369 self.failUnlessEqual(already, set([0,1,2]))
370 self.failUnlessEqual(set(writers.keys()), set([3,4]))
372 # while those two buckets are open for writing, the server should
373 # refuse to offer them to uploaders
375 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
376 self.failUnlessEqual(already2, set([0,1,2]))
377 self.failUnlessEqual(set(writers2.keys()), set([5]))
379 # aborting the writes should remove the tempfiles
380 for i,wb in writers2.items():
382 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
383 self.failUnlessEqual(already2, set([0,1,2]))
384 self.failUnlessEqual(set(writers2.keys()), set([5]))
386 for i,wb in writers2.items():
388 for i,wb in writers.items():
391 def test_bad_container_version(self):
392 ss = self.create("test_bad_container_version")
393 a,w = self.allocate(ss, "si1", [0], 10)
394 w[0].remote_write(0, "\xff"*10)
397 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
400 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
403 ss.remote_get_buckets("allocate")
405 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
406 ss.remote_get_buckets, "si1")
407 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
409 def test_disconnect(self):
410 # simulate a disconnection
411 ss = self.create("test_disconnect")
412 canary = FakeCanary()
413 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
414 self.failUnlessEqual(already, set())
415 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
416 for (f,args,kwargs) in canary.disconnectors.values():
421 # that ought to delete the incoming shares
422 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
423 self.failUnlessEqual(already, set())
424 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
426 def test_reserved_space(self):
427 ss = self.create("test_reserved_space", reserved_space=10000,
428 klass=FakeDiskStorageServer)
429 # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
431 # 15k available, 10k reserved, leaves 5k for shares
433 # a newly created and filled share incurs this much overhead, beyond
434 # the size we request.
436 LEASE_SIZE = 4+32+32+4
437 canary = FakeCanary(True)
438 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
439 self.failUnlessEqual(len(writers), 3)
440 # now the StorageServer should have 3000 bytes provisionally
441 # allocated, allowing only 2000 more to be claimed
442 self.failUnlessEqual(len(ss._active_writers), 3)
444 # allocating 1001-byte shares only leaves room for one
445 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
446 self.failUnlessEqual(len(writers2), 1)
447 self.failUnlessEqual(len(ss._active_writers), 4)
449 # we abandon the first set, so their provisional allocation should be
453 self.failUnlessEqual(len(ss._active_writers), 1)
454 # now we have a provisional allocation of 1001 bytes
456 # and we close the second set, so their provisional allocation should
457 # become real, long-term allocation, and grows to include the
459 for bw in writers2.values():
460 bw.remote_write(0, "a"*25)
465 self.failUnlessEqual(len(ss._active_writers), 0)
467 allocated = 1001 + OVERHEAD + LEASE_SIZE
469 # we have to manually increase DISKAVAIL, since we're not doing real
471 ss.DISKAVAIL -= allocated
473 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
474 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
475 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
476 self.failUnlessEqual(len(writers3), 39)
477 self.failUnlessEqual(len(ss._active_writers), 39)
481 self.failUnlessEqual(len(ss._active_writers), 0)
482 ss.disownServiceParent()
485 def test_disk_stats(self):
486 # This will spuriously fail if there is zero disk space left (but so will other tests).
487 ss = self.create("test_disk_stats", reserved_space=0)
489 disk = ss.get_disk_stats()
490 self.failUnless(disk['total'] > 0, disk['total'])
491 self.failUnless(disk['used'] > 0, disk['used'])
492 self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
493 self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
494 self.failUnless(disk['avail'] > 0, disk['avail'])
496 def test_disk_stats_avail_nonnegative(self):
497 ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
499 disk = ss.get_disk_stats()
500 self.failUnlessEqual(disk['avail'], 0)
503 basedir = self.workdir("test_seek_behavior")
504 fileutil.make_dirs(basedir)
505 filename = os.path.join(basedir, "testfile")
506 f = open(filename, "wb")
509 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
510 # files. mode="a" preserves previous contents but does not allow
511 # seeking-to-create-holes. mode="r+" allows both.
512 f = open(filename, "rb+")
516 filelen = os.stat(filename)[stat.ST_SIZE]
517 self.failUnlessEqual(filelen, 100+3)
518 f2 = open(filename, "rb")
519 self.failUnlessEqual(f2.read(5), "start")
522 def test_leases(self):
523 ss = self.create("test_leases")
524 canary = FakeCanary()
528 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
529 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
530 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
531 sharenums, size, canary)
532 self.failUnlessEqual(len(already), 0)
533 self.failUnlessEqual(len(writers), 5)
534 for wb in writers.values():
537 leases = list(ss.get_leases("si0"))
538 self.failUnlessEqual(len(leases), 1)
539 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
541 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
542 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
543 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
544 sharenums, size, canary)
545 for wb in writers.values():
548 # take out a second lease on si1
549 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
550 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
551 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
552 sharenums, size, canary)
553 self.failUnlessEqual(len(already), 5)
554 self.failUnlessEqual(len(writers), 0)
556 leases = list(ss.get_leases("si1"))
557 self.failUnlessEqual(len(leases), 2)
558 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
560 # and a third lease, using add-lease
561 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
562 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
563 ss.remote_add_lease("si1", rs2a, cs2a)
564 leases = list(ss.get_leases("si1"))
565 self.failUnlessEqual(len(leases), 3)
566 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
568 # add-lease on a missing storage index is silently ignored
569 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
571 # check that si0 is readable
572 readers = ss.remote_get_buckets("si0")
573 self.failUnlessEqual(len(readers), 5)
575 # renew the first lease. Only the proper renew_secret should work
576 ss.remote_renew_lease("si0", rs0)
577 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
578 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
580 # check that si0 is still readable
581 readers = ss.remote_get_buckets("si0")
582 self.failUnlessEqual(len(readers), 5)
585 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
586 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
587 ss.remote_cancel_lease("si0", cs0)
589 # si0 should now be gone
590 readers = ss.remote_get_buckets("si0")
591 self.failUnlessEqual(len(readers), 0)
592 # and the renew should no longer work
593 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
596 # cancel the first lease on si1, leaving the second and third in place
597 ss.remote_cancel_lease("si1", cs1)
598 readers = ss.remote_get_buckets("si1")
599 self.failUnlessEqual(len(readers), 5)
600 # the corresponding renew should no longer work
601 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
603 leases = list(ss.get_leases("si1"))
604 self.failUnlessEqual(len(leases), 2)
605 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
607 ss.remote_renew_lease("si1", rs2)
608 # cancelling the second and third should make it go away
609 ss.remote_cancel_lease("si1", cs2)
610 ss.remote_cancel_lease("si1", cs2a)
611 readers = ss.remote_get_buckets("si1")
612 self.failUnlessEqual(len(readers), 0)
613 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
614 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
615 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
617 leases = list(ss.get_leases("si1"))
618 self.failUnlessEqual(len(leases), 0)
621 # test overlapping uploads
622 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
623 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
624 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
625 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
626 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
627 sharenums, size, canary)
628 self.failUnlessEqual(len(already), 0)
629 self.failUnlessEqual(len(writers), 5)
630 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
631 sharenums, size, canary)
632 self.failUnlessEqual(len(already2), 0)
633 self.failUnlessEqual(len(writers2), 0)
634 for wb in writers.values():
637 leases = list(ss.get_leases("si3"))
638 self.failUnlessEqual(len(leases), 1)
640 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
641 sharenums, size, canary)
642 self.failUnlessEqual(len(already3), 5)
643 self.failUnlessEqual(len(writers3), 0)
645 leases = list(ss.get_leases("si3"))
646 self.failUnlessEqual(len(leases), 2)
648 def test_readonly(self):
649 workdir = self.workdir("test_readonly")
650 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
651 ss.setServiceParent(self.sparent)
653 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
654 self.failUnlessEqual(already, set())
655 self.failUnlessEqual(writers, {})
657 stats = ss.get_stats()
658 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
659 if "storage_server.disk_avail" in stats:
660 # Some platforms may not have an API to get disk stats.
661 # But if there are stats, readonly_storage means disk_avail=0
662 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
664 def test_discard(self):
665 # discard is really only used for other tests, but we test it anyways
666 workdir = self.workdir("test_discard")
667 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
668 ss.setServiceParent(self.sparent)
670 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
671 self.failUnlessEqual(already, set())
672 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
673 for i,wb in writers.items():
674 wb.remote_write(0, "%25d" % i)
676 # since we discard the data, the shares should be present but sparse.
677 # Since we write with some seeks, the data we read back will be all
679 b = ss.remote_get_buckets("vid")
680 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
681 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
683 def test_advise_corruption(self):
684 workdir = self.workdir("test_advise_corruption")
685 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
686 ss.setServiceParent(self.sparent)
688 si0_s = base32.b2a("si0")
689 ss.remote_advise_corrupt_share("immutable", "si0", 0,
690 "This share smells funny.\n")
691 reportdir = os.path.join(workdir, "corruption-advisories")
692 reports = os.listdir(reportdir)
693 self.failUnlessEqual(len(reports), 1)
694 report_si0 = reports[0]
695 self.failUnlessIn(si0_s, report_si0)
696 f = open(os.path.join(reportdir, report_si0), "r")
699 self.failUnlessIn("type: immutable", report)
700 self.failUnlessIn("storage_index: %s" % si0_s, report)
701 self.failUnlessIn("share_number: 0", report)
702 self.failUnlessIn("This share smells funny.", report)
704 # test the RIBucketWriter version too
705 si1_s = base32.b2a("si1")
706 already,writers = self.allocate(ss, "si1", [1], 75)
707 self.failUnlessEqual(already, set())
708 self.failUnlessEqual(set(writers.keys()), set([1]))
709 writers[1].remote_write(0, "data")
710 writers[1].remote_close()
712 b = ss.remote_get_buckets("si1")
713 self.failUnlessEqual(set(b.keys()), set([1]))
714 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
716 reports = os.listdir(reportdir)
717 self.failUnlessEqual(len(reports), 2)
718 report_si1 = [r for r in reports if si1_s in r][0]
719 f = open(os.path.join(reportdir, report_si1), "r")
722 self.failUnlessIn("type: immutable", report)
723 self.failUnlessIn("storage_index: %s" % si1_s, report)
724 self.failUnlessIn("share_number: 1", report)
725 self.failUnlessIn("This share tastes like dust.", report)
729 class MutableServer(unittest.TestCase):
732 self.sparent = LoggingServiceParent()
733 self._lease_secret = itertools.count()
735 return self.sparent.stopService()
737 def workdir(self, name):
738 basedir = os.path.join("storage", "MutableServer", name)
741 def create(self, name):
742 workdir = self.workdir(name)
743 ss = StorageServer(workdir, "\x00" * 20)
744 ss.setServiceParent(self.sparent)
747 def test_create(self):
748 self.create("test_create")
750 def write_enabler(self, we_tag):
751 return hashutil.tagged_hash("we_blah", we_tag)
753 def renew_secret(self, tag):
754 return hashutil.tagged_hash("renew_blah", str(tag))
756 def cancel_secret(self, tag):
757 return hashutil.tagged_hash("cancel_blah", str(tag))
759 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
760 write_enabler = self.write_enabler(we_tag)
761 renew_secret = self.renew_secret(lease_tag)
762 cancel_secret = self.cancel_secret(lease_tag)
763 rstaraw = ss.remote_slot_testv_and_readv_and_writev
764 testandwritev = dict( [ (shnum, ([], [], None) )
765 for shnum in sharenums ] )
767 rc = rstaraw(storage_index,
768 (write_enabler, renew_secret, cancel_secret),
771 (did_write, readv_data) = rc
772 self.failUnless(did_write)
773 self.failUnless(isinstance(readv_data, dict))
774 self.failUnlessEqual(len(readv_data), 0)
776 def test_bad_magic(self):
777 ss = self.create("test_bad_magic")
778 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
779 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
784 read = ss.remote_slot_readv
785 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
786 read, "si1", [0], [(0,10)])
787 self.failUnlessIn(" had magic ", str(e))
788 self.failUnlessIn(" but we wanted ", str(e))
790 def test_container_size(self):
791 ss = self.create("test_container_size")
792 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
794 read = ss.remote_slot_readv
795 rstaraw = ss.remote_slot_testv_and_readv_and_writev
796 secrets = ( self.write_enabler("we1"),
797 self.renew_secret("we1"),
798 self.cancel_secret("we1") )
799 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
800 answer = rstaraw("si1", secrets,
801 {0: ([], [(0,data)], len(data)+12)},
803 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
805 # trying to make the container too large will raise an exception
806 TOOBIG = MutableShareFile.MAX_SIZE + 10
807 self.failUnlessRaises(DataTooLargeError,
808 rstaraw, "si1", secrets,
809 {0: ([], [(0,data)], TOOBIG)},
812 # it should be possible to make the container smaller, although at
813 # the moment this doesn't actually affect the share, unless the
814 # container size is dropped to zero, in which case the share is
816 answer = rstaraw("si1", secrets,
817 {0: ([], [(0,data)], len(data)+8)},
819 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
821 answer = rstaraw("si1", secrets,
822 {0: ([], [(0,data)], 0)},
824 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
826 read_answer = read("si1", [0], [(0,10)])
827 self.failUnlessEqual(read_answer, {})
829 def test_allocate(self):
830 ss = self.create("test_allocate")
831 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
834 read = ss.remote_slot_readv
835 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
837 self.failUnlessEqual(read("si1", [], [(0, 10)]),
838 {0: [""], 1: [""], 2: [""]})
839 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
843 secrets = ( self.write_enabler("we1"),
844 self.renew_secret("we1"),
845 self.cancel_secret("we1") )
846 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
847 write = ss.remote_slot_testv_and_readv_and_writev
848 answer = write("si1", secrets,
849 {0: ([], [(0,data)], None)},
851 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
853 self.failUnlessEqual(read("si1", [0], [(0,20)]),
854 {0: ["00000000001111111111"]})
855 self.failUnlessEqual(read("si1", [0], [(95,10)]),
857 #self.failUnlessEqual(s0.remote_get_length(), 100)
859 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
860 f = self.failUnlessRaises(BadWriteEnablerError,
861 write, "si1", bad_secrets,
863 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
865 # this testv should fail
866 answer = write("si1", secrets,
867 {0: ([(0, 12, "eq", "444444444444"),
868 (20, 5, "eq", "22222"),
875 self.failUnlessEqual(answer, (False,
876 {0: ["000000000011", "22222"],
880 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
883 answer = write("si1", secrets,
884 {0: ([(10, 5, "lt", "11111"),
891 self.failUnlessEqual(answer, (False,
896 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
899 def test_operators(self):
900 # test operators, the data we're comparing is '11111' in all cases.
901 # test both fail+pass, reset data after each one.
902 ss = self.create("test_operators")
904 secrets = ( self.write_enabler("we1"),
905 self.renew_secret("we1"),
906 self.cancel_secret("we1") )
907 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
908 write = ss.remote_slot_testv_and_readv_and_writev
909 read = ss.remote_slot_readv
912 write("si1", secrets,
913 {0: ([], [(0,data)], None)},
919 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
924 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
925 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
926 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
929 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
934 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
935 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
938 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
943 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
944 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
948 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
953 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
954 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
957 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
962 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
963 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
966 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
971 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
972 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
976 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
981 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
982 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
985 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
990 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
991 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
995 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1000 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1001 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1004 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1009 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1010 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1014 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1019 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1020 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1023 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1028 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1029 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1032 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1037 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1038 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1042 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1047 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1048 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1051 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1056 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1057 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1060 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1065 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1066 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1069 # finally, test some operators against empty shares
1070 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1075 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1076 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1079 def test_readv(self):
1080 ss = self.create("test_readv")
1081 secrets = ( self.write_enabler("we1"),
1082 self.renew_secret("we1"),
1083 self.cancel_secret("we1") )
1084 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1085 write = ss.remote_slot_testv_and_readv_and_writev
1086 read = ss.remote_slot_readv
1087 data = [("%d" % i) * 100 for i in range(3)]
1088 rc = write("si1", secrets,
1089 {0: ([], [(0,data[0])], None),
1090 1: ([], [(0,data[1])], None),
1091 2: ([], [(0,data[2])], None),
1093 self.failUnlessEqual(rc, (True, {}))
1095 answer = read("si1", [], [(0, 10)])
1096 self.failUnlessEqual(answer, {0: ["0"*10],
1100 def compare_leases_without_timestamps(self, leases_a, leases_b):
1101 self.failUnlessEqual(len(leases_a), len(leases_b))
1102 for i in range(len(leases_a)):
1105 self.failUnlessEqual(a.owner_num, b.owner_num)
1106 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1107 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1108 self.failUnlessEqual(a.nodeid, b.nodeid)
1110 def compare_leases(self, leases_a, leases_b):
1111 self.failUnlessEqual(len(leases_a), len(leases_b))
1112 for i in range(len(leases_a)):
1115 self.failUnlessEqual(a.owner_num, b.owner_num)
1116 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1117 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1118 self.failUnlessEqual(a.nodeid, b.nodeid)
1119 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1121 def test_leases(self):
1122 ss = self.create("test_leases")
1124 return ( self.write_enabler("we1"),
1125 self.renew_secret("we1-%d" % n),
1126 self.cancel_secret("we1-%d" % n) )
1127 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1128 write = ss.remote_slot_testv_and_readv_and_writev
1129 read = ss.remote_slot_readv
1130 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1131 self.failUnlessEqual(rc, (True, {}))
1133 # create a random non-numeric file in the bucket directory, to
1134 # exercise the code that's supposed to ignore those.
1135 bucket_dir = os.path.join(self.workdir("test_leases"),
1136 "shares", storage_index_to_dir("si1"))
1137 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1138 f.write("you ought to be ignoring me\n")
1141 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1142 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1144 # add-lease on a missing storage index is silently ignored
1145 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1147 # re-allocate the slots and use the same secrets, that should update
1149 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1150 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1153 ss.remote_renew_lease("si1", secrets(0)[1])
1154 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1156 # now allocate them with a bunch of different secrets, to trigger the
1157 # extended lease code. Use add_lease for one of them.
1158 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1159 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1160 secrets2 = secrets(2)
1161 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1162 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1163 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1164 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1165 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1167 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1169 # cancel one of them
1170 ss.remote_cancel_lease("si1", secrets(5)[2])
1171 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1173 all_leases = list(s0.get_leases())
1174 # and write enough data to expand the container, forcing the server
1175 # to move the leases
1176 write("si1", secrets(0),
1177 {0: ([], [(0,data)], 200), },
1180 # read back the leases, make sure they're still intact.
1181 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1183 ss.remote_renew_lease("si1", secrets(0)[1])
1184 ss.remote_renew_lease("si1", secrets(1)[1])
1185 ss.remote_renew_lease("si1", secrets(2)[1])
1186 ss.remote_renew_lease("si1", secrets(3)[1])
1187 ss.remote_renew_lease("si1", secrets(4)[1])
1188 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1189 # get a new copy of the leases, with the current timestamps. Reading
1190 # data and failing to renew/cancel leases should leave the timestamps
1192 all_leases = list(s0.get_leases())
1193 # renewing with a bogus token should prompt an error message
1195 # examine the exception thus raised, make sure the old nodeid is
1196 # present, to provide for share migration
1197 e = self.failUnlessRaises(IndexError,
1198 ss.remote_renew_lease, "si1",
1201 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1202 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1203 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1205 # same for cancelling
1206 self.failUnlessRaises(IndexError,
1207 ss.remote_cancel_lease, "si1",
1209 self.compare_leases(all_leases, list(s0.get_leases()))
1211 # reading shares should not modify the timestamp
1212 read("si1", [], [(0,200)])
1213 self.compare_leases(all_leases, list(s0.get_leases()))
1215 write("si1", secrets(0),
1216 {0: ([], [(200, "make me bigger")], None)}, [])
1217 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1219 write("si1", secrets(0),
1220 {0: ([], [(500, "make me really bigger")], None)}, [])
1221 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1223 # now cancel them all
1224 ss.remote_cancel_lease("si1", secrets(0)[2])
1225 ss.remote_cancel_lease("si1", secrets(1)[2])
1226 ss.remote_cancel_lease("si1", secrets(2)[2])
1227 ss.remote_cancel_lease("si1", secrets(3)[2])
1229 # the slot should still be there
1230 remaining_shares = read("si1", [], [(0,10)])
1231 self.failUnlessEqual(len(remaining_shares), 1)
1232 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1234 # cancelling a non-existent lease should raise an IndexError
1235 self.failUnlessRaises(IndexError,
1236 ss.remote_cancel_lease, "si1", "nonsecret")
1238 # and the slot should still be there
1239 remaining_shares = read("si1", [], [(0,10)])
1240 self.failUnlessEqual(len(remaining_shares), 1)
1241 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1243 ss.remote_cancel_lease("si1", secrets(4)[2])
1244 # now the slot should be gone
1245 no_shares = read("si1", [], [(0,10)])
1246 self.failUnlessEqual(no_shares, {})
1248 # cancelling a lease on a non-existent share should raise an IndexError
1249 self.failUnlessRaises(IndexError,
1250 ss.remote_cancel_lease, "si2", "nonsecret")
1252 def test_remove(self):
1253 ss = self.create("test_remove")
1254 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1256 readv = ss.remote_slot_readv
1257 writev = ss.remote_slot_testv_and_readv_and_writev
1258 secrets = ( self.write_enabler("we1"),
1259 self.renew_secret("we1"),
1260 self.cancel_secret("we1") )
1261 # delete sh0 by setting its size to zero
1262 answer = writev("si1", secrets,
1265 # the answer should mention all the shares that existed before the
1267 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1268 # but a new read should show only sh1 and sh2
1269 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1272 # delete sh1 by setting its size to zero
1273 answer = writev("si1", secrets,
1276 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1277 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1280 # delete sh2 by setting its size to zero
1281 answer = writev("si1", secrets,
1284 self.failUnlessEqual(answer, (True, {2:[]}) )
1285 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1287 # and the bucket directory should now be gone
1288 si = base32.b2a("si1")
1289 # note: this is a detail of the storage server implementation, and
1290 # may change in the future
1292 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1293 bucketdir = os.path.join(prefixdir, si)
1294 self.failUnless(os.path.exists(prefixdir), prefixdir)
1295 self.failIf(os.path.exists(bucketdir), bucketdir)
1297 class Stats(unittest.TestCase):
1300 self.sparent = LoggingServiceParent()
1301 self._lease_secret = itertools.count()
1303 return self.sparent.stopService()
1305 def workdir(self, name):
1306 basedir = os.path.join("storage", "Server", name)
1309 def create(self, name):
1310 workdir = self.workdir(name)
1311 ss = StorageServer(workdir, "\x00" * 20)
1312 ss.setServiceParent(self.sparent)
1315 def test_latencies(self):
1316 ss = self.create("test_latencies")
1317 for i in range(10000):
1318 ss.add_latency("allocate", 1.0 * i)
1319 for i in range(1000):
1320 ss.add_latency("renew", 1.0 * i)
1322 ss.add_latency("cancel", 2.0 * i)
1323 ss.add_latency("get", 5.0)
1325 output = ss.get_latencies()
1327 self.failUnlessEqual(sorted(output.keys()),
1328 sorted(["allocate", "renew", "cancel", "get"]))
1329 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1330 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1331 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1332 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1333 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1334 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1335 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1336 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1337 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1339 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1340 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1341 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
1342 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1343 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1344 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1345 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1346 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1347 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1349 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1350 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1351 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1, output)
1352 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
1353 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1354 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1355 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1356 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1357 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1359 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1360 self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1361 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1362 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1363 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1364 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1365 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1366 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1367 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1370 s = re.sub(r'<[^>]*>', ' ', s)
1371 s = re.sub(r'\s+', ' ', s)
1374 class MyBucketCountingCrawler(BucketCountingCrawler):
1375 def finished_prefix(self, cycle, prefix):
1376 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1378 d = self.hook_ds.pop(0)
1381 class MyStorageServer(StorageServer):
1382 def add_bucket_counter(self):
1383 statefile = os.path.join(self.storedir, "bucket_counter.state")
1384 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1385 self.bucket_counter.setServiceParent(self)
1387 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1390 self.s = service.MultiService()
1391 self.s.startService()
1393 return self.s.stopService()
1395 def test_bucket_counter(self):
1396 basedir = "storage/BucketCounter/bucket_counter"
1397 fileutil.make_dirs(basedir)
1398 ss = StorageServer(basedir, "\x00" * 20)
1399 # to make sure we capture the bucket-counting-crawler in the middle
1400 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1401 # also make it start sooner than usual.
1402 ss.bucket_counter.slow_start = 0
1403 orig_cpu_slice = ss.bucket_counter.cpu_slice
1404 ss.bucket_counter.cpu_slice = 0
1405 ss.setServiceParent(self.s)
1407 w = StorageStatus(ss)
1409 # this sample is before the crawler has started doing anything
1410 html = w.renderSynchronously()
1411 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1412 s = remove_tags(html)
1413 self.failUnlessIn("Accepting new shares: Yes", s)
1414 self.failUnlessIn("Reserved space: - 0 B (0)", s)
1415 self.failUnlessIn("Total buckets: Not computed yet", s)
1416 self.failUnlessIn("Next crawl in", s)
1418 # give the bucket-counting-crawler one tick to get started. The
1419 # cpu_slice=0 will force it to yield right after it processes the
1422 d = fireEventually()
1423 def _check(ignored):
1424 # are we really right after the first prefix?
1425 state = ss.bucket_counter.get_state()
1426 if state["last-complete-prefix"] is None:
1427 d2 = fireEventually()
1428 d2.addCallback(_check)
1430 self.failUnlessEqual(state["last-complete-prefix"],
1431 ss.bucket_counter.prefixes[0])
1432 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1433 html = w.renderSynchronously()
1434 s = remove_tags(html)
1435 self.failUnlessIn(" Current crawl ", s)
1436 self.failUnlessIn(" (next work in ", s)
1437 d.addCallback(_check)
1439 # now give it enough time to complete a full cycle
1441 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1442 d.addCallback(lambda ignored: self.poll(_watch))
1443 def _check2(ignored):
1444 ss.bucket_counter.cpu_slice = orig_cpu_slice
1445 html = w.renderSynchronously()
1446 s = remove_tags(html)
1447 self.failUnlessIn("Total buckets: 0 (the number of", s)
1448 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1449 d.addCallback(_check2)
1452 def test_bucket_counter_cleanup(self):
1453 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1454 fileutil.make_dirs(basedir)
1455 ss = StorageServer(basedir, "\x00" * 20)
1456 # to make sure we capture the bucket-counting-crawler in the middle
1457 # of a cycle, we reach in and reduce its maximum slice time to 0.
1458 ss.bucket_counter.slow_start = 0
1459 orig_cpu_slice = ss.bucket_counter.cpu_slice
1460 ss.bucket_counter.cpu_slice = 0
1461 ss.setServiceParent(self.s)
1463 d = fireEventually()
1465 def _after_first_prefix(ignored):
1466 state = ss.bucket_counter.state
1467 if state["last-complete-prefix"] is None:
1468 d2 = fireEventually()
1469 d2.addCallback(_after_first_prefix)
1471 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1472 # now sneak in and mess with its state, to make sure it cleans up
1473 # properly at the end of the cycle
1474 self.failUnlessEqual(state["last-complete-prefix"],
1475 ss.bucket_counter.prefixes[0])
1476 state["bucket-counts"][-12] = {}
1477 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1478 ss.bucket_counter.save_state()
1479 d.addCallback(_after_first_prefix)
1481 # now give it enough time to complete a cycle
1483 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1484 d.addCallback(lambda ignored: self.poll(_watch))
1485 def _check2(ignored):
1486 ss.bucket_counter.cpu_slice = orig_cpu_slice
1487 s = ss.bucket_counter.get_state()
1488 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1489 self.failIf("bogusprefix!" in s["storage-index-samples"],
1490 s["storage-index-samples"].keys())
1491 d.addCallback(_check2)
1494 def test_bucket_counter_eta(self):
1495 basedir = "storage/BucketCounter/bucket_counter_eta"
1496 fileutil.make_dirs(basedir)
1497 ss = MyStorageServer(basedir, "\x00" * 20)
1498 ss.bucket_counter.slow_start = 0
1499 # these will be fired inside finished_prefix()
1500 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1501 w = StorageStatus(ss)
1503 d = defer.Deferred()
1505 def _check_1(ignored):
1506 # no ETA is available yet
1507 html = w.renderSynchronously()
1508 s = remove_tags(html)
1509 self.failUnlessIn("complete (next work", s)
1511 def _check_2(ignored):
1512 # one prefix has finished, so an ETA based upon that elapsed time
1513 # should be available.
1514 html = w.renderSynchronously()
1515 s = remove_tags(html)
1516 self.failUnlessIn("complete (ETA ", s)
1518 def _check_3(ignored):
1519 # two prefixes have finished
1520 html = w.renderSynchronously()
1521 s = remove_tags(html)
1522 self.failUnlessIn("complete (ETA ", s)
1525 hooks[0].addCallback(_check_1).addErrback(d.errback)
1526 hooks[1].addCallback(_check_2).addErrback(d.errback)
1527 hooks[2].addCallback(_check_3).addErrback(d.errback)
1529 ss.setServiceParent(self.s)
1532 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1533 stop_after_first_bucket = False
1534 def process_bucket(self, *args, **kwargs):
1535 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1536 if self.stop_after_first_bucket:
1537 self.stop_after_first_bucket = False
1538 self.cpu_slice = -1.0
1539 def yielding(self, sleep_time):
1540 if not self.stop_after_first_bucket:
1541 self.cpu_slice = 500
1543 class BrokenStatResults:
1545 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1548 bsr = BrokenStatResults()
1549 for attrname in dir(s):
1550 if attrname.startswith("_"):
1552 if attrname == "st_blocks":
1554 setattr(bsr, attrname, getattr(s, attrname))
1557 class InstrumentedStorageServer(StorageServer):
1558 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1559 class No_ST_BLOCKS_StorageServer(StorageServer):
1560 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1562 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1565 self.s = service.MultiService()
1566 self.s.startService()
1568 return self.s.stopService()
1570 def make_shares(self, ss):
1572 return (si, hashutil.tagged_hash("renew", si),
1573 hashutil.tagged_hash("cancel", si))
1574 def make_mutable(si):
1575 return (si, hashutil.tagged_hash("renew", si),
1576 hashutil.tagged_hash("cancel", si),
1577 hashutil.tagged_hash("write-enabler", si))
1578 def make_extra_lease(si, num):
1579 return (hashutil.tagged_hash("renew-%d" % num, si),
1580 hashutil.tagged_hash("cancel-%d" % num, si))
1582 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1583 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1584 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1585 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1586 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1587 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1589 canary = FakeCanary()
1590 # note: 'tahoe debug dump-share' will not handle this file, since the
1591 # inner contents are not a valid CHK share
1592 data = "\xff" * 1000
1594 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1596 w[0].remote_write(0, data)
1599 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1601 w[0].remote_write(0, data)
1603 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1605 writev = ss.remote_slot_testv_and_readv_and_writev
1606 writev(mutable_si_2, (we2, rs2, cs2),
1607 {0: ([], [(0,data)], len(data))}, [])
1608 writev(mutable_si_3, (we3, rs3, cs3),
1609 {0: ([], [(0,data)], len(data))}, [])
1610 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1612 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1613 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1614 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1616 def test_basic(self):
1617 basedir = "storage/LeaseCrawler/basic"
1618 fileutil.make_dirs(basedir)
1619 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1620 # make it start sooner than usual.
1621 lc = ss.lease_checker
1624 lc.stop_after_first_bucket = True
1625 webstatus = StorageStatus(ss)
1627 # create a few shares, with some leases on them
1628 self.make_shares(ss)
1629 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1631 # add a non-sharefile to exercise another code path
1632 fn = os.path.join(ss.sharedir,
1633 storage_index_to_dir(immutable_si_0),
1636 f.write("I am not a share.\n")
1639 # this is before the crawl has started, so we're not in a cycle yet
1640 initial_state = lc.get_state()
1641 self.failIf(lc.get_progress()["cycle-in-progress"])
1642 self.failIfIn("cycle-to-date", initial_state)
1643 self.failIfIn("estimated-remaining-cycle", initial_state)
1644 self.failIfIn("estimated-current-cycle", initial_state)
1645 self.failUnlessIn("history", initial_state)
1646 self.failUnlessEqual(initial_state["history"], {})
1648 ss.setServiceParent(self.s)
1652 d = fireEventually()
1654 # now examine the state right after the first bucket has been
1656 def _after_first_bucket(ignored):
1657 initial_state = lc.get_state()
1658 if "cycle-to-date" not in initial_state:
1659 d2 = fireEventually()
1660 d2.addCallback(_after_first_bucket)
1662 self.failUnlessIn("cycle-to-date", initial_state)
1663 self.failUnlessIn("estimated-remaining-cycle", initial_state)
1664 self.failUnlessIn("estimated-current-cycle", initial_state)
1665 self.failUnlessIn("history", initial_state)
1666 self.failUnlessEqual(initial_state["history"], {})
1668 so_far = initial_state["cycle-to-date"]
1669 self.failUnlessEqual(so_far["expiration-enabled"], False)
1670 self.failUnlessIn("configured-expiration-mode", so_far)
1671 self.failUnlessIn("lease-age-histogram", so_far)
1672 lah = so_far["lease-age-histogram"]
1673 self.failUnlessEqual(type(lah), list)
1674 self.failUnlessEqual(len(lah), 1)
1675 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1676 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1677 self.failUnlessEqual(so_far["corrupt-shares"], [])
1678 sr1 = so_far["space-recovered"]
1679 self.failUnlessEqual(sr1["examined-buckets"], 1)
1680 self.failUnlessEqual(sr1["examined-shares"], 1)
1681 self.failUnlessEqual(sr1["actual-shares"], 0)
1682 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1683 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1684 left = initial_state["estimated-remaining-cycle"]
1685 sr2 = left["space-recovered"]
1686 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1687 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1688 self.failIfEqual(sr2["actual-shares"], None)
1689 self.failIfEqual(sr2["configured-diskbytes"], None)
1690 self.failIfEqual(sr2["original-sharebytes"], None)
1691 d.addCallback(_after_first_bucket)
1692 d.addCallback(lambda ign: self.render1(webstatus))
1693 def _check_html_in_cycle(html):
1694 s = remove_tags(html)
1695 self.failUnlessIn("So far, this cycle has examined "
1696 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1697 self.failUnlessIn("and has recovered: "
1698 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1699 "0 B (0 B / 0 B)", s)
1700 self.failUnlessIn("If expiration were enabled, "
1701 "we would have recovered: "
1702 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1703 " 0 B (0 B / 0 B) by now", s)
1704 self.failUnlessIn("and the remainder of this cycle "
1705 "would probably recover: "
1706 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1707 " 0 B (0 B / 0 B)", s)
1708 self.failUnlessIn("and the whole cycle would probably recover: "
1709 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1710 " 0 B (0 B / 0 B)", s)
1711 self.failUnlessIn("if we were strictly using each lease's default "
1712 "31-day lease lifetime", s)
1713 self.failUnlessIn("this cycle would be expected to recover: ", s)
1714 d.addCallback(_check_html_in_cycle)
1716 # wait for the crawler to finish the first cycle. Nothing should have
1719 return bool(lc.get_state()["last-cycle-finished"] is not None)
1720 d.addCallback(lambda ign: self.poll(_wait))
1722 def _after_first_cycle(ignored):
1724 self.failIf("cycle-to-date" in s)
1725 self.failIf("estimated-remaining-cycle" in s)
1726 self.failIf("estimated-current-cycle" in s)
1727 last = s["history"][0]
1728 self.failUnlessIn("cycle-start-finish-times", last)
1729 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1730 self.failUnlessEqual(last["expiration-enabled"], False)
1731 self.failUnlessIn("configured-expiration-mode", last)
1733 self.failUnlessIn("lease-age-histogram", last)
1734 lah = last["lease-age-histogram"]
1735 self.failUnlessEqual(type(lah), list)
1736 self.failUnlessEqual(len(lah), 1)
1737 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1739 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1740 self.failUnlessEqual(last["corrupt-shares"], [])
1742 rec = last["space-recovered"]
1743 self.failUnlessEqual(rec["examined-buckets"], 4)
1744 self.failUnlessEqual(rec["examined-shares"], 4)
1745 self.failUnlessEqual(rec["actual-buckets"], 0)
1746 self.failUnlessEqual(rec["original-buckets"], 0)
1747 self.failUnlessEqual(rec["configured-buckets"], 0)
1748 self.failUnlessEqual(rec["actual-shares"], 0)
1749 self.failUnlessEqual(rec["original-shares"], 0)
1750 self.failUnlessEqual(rec["configured-shares"], 0)
1751 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1752 self.failUnlessEqual(rec["original-diskbytes"], 0)
1753 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1754 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1755 self.failUnlessEqual(rec["original-sharebytes"], 0)
1756 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1758 def _get_sharefile(si):
1759 return list(ss._iter_share_files(si))[0]
1760 def count_leases(si):
1761 return len(list(_get_sharefile(si).get_leases()))
1762 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1763 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1764 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1765 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1766 d.addCallback(_after_first_cycle)
1767 d.addCallback(lambda ign: self.render1(webstatus))
1768 def _check_html(html):
1769 s = remove_tags(html)
1770 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1771 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1772 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1773 "(2 mutable / 2 immutable),", s)
1774 self.failUnlessIn("but expiration was not enabled", s)
1775 d.addCallback(_check_html)
1776 d.addCallback(lambda ign: self.render_json(webstatus))
1777 def _check_json(json):
1778 data = simplejson.loads(json)
1779 self.failUnlessIn("lease-checker", data)
1780 self.failUnlessIn("lease-checker-progress", data)
1781 d.addCallback(_check_json)
1784 def backdate_lease(self, sf, renew_secret, new_expire_time):
1785 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1786 # "renew" a lease with a new_expire_time that is older than what the
1787 # current lease has), so we have to reach inside it.
1788 for i,lease in enumerate(sf.get_leases()):
1789 if lease.renew_secret == renew_secret:
1790 lease.expiration_time = new_expire_time
1791 f = open(sf.home, 'rb+')
1792 sf._write_lease_record(f, i, lease)
1795 raise IndexError("unable to renew non-existent lease")
1797 def test_expire_age(self):
1798 basedir = "storage/LeaseCrawler/expire_age"
1799 fileutil.make_dirs(basedir)
1800 # setting expiration_time to 2000 means that any lease which is more
1801 # than 2000s old will be expired.
1802 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1803 expiration_enabled=True,
1804 expiration_mode="age",
1805 expiration_override_lease_duration=2000)
1806 # make it start sooner than usual.
1807 lc = ss.lease_checker
1809 lc.stop_after_first_bucket = True
1810 webstatus = StorageStatus(ss)
1812 # create a few shares, with some leases on them
1813 self.make_shares(ss)
1814 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1816 def count_shares(si):
1817 return len(list(ss._iter_share_files(si)))
1818 def _get_sharefile(si):
1819 return list(ss._iter_share_files(si))[0]
1820 def count_leases(si):
1821 return len(list(_get_sharefile(si).get_leases()))
1823 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1824 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1825 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1826 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1827 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1828 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1829 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1830 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1832 # artificially crank back the expiration time on the first lease of
1833 # each share, to make it look like it expired already (age=1000s).
1834 # Some shares have an extra lease which is set to expire at the
1835 # default time in 31 days from now (age=31days). We then run the
1836 # crawler, which will expire the first lease, making some shares get
1837 # deleted and others stay alive (with one remaining lease)
1840 sf0 = _get_sharefile(immutable_si_0)
1841 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1842 sf0_size = os.stat(sf0.home).st_size
1844 # immutable_si_1 gets an extra lease
1845 sf1 = _get_sharefile(immutable_si_1)
1846 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1848 sf2 = _get_sharefile(mutable_si_2)
1849 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1850 sf2_size = os.stat(sf2.home).st_size
1852 # mutable_si_3 gets an extra lease
1853 sf3 = _get_sharefile(mutable_si_3)
1854 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1856 ss.setServiceParent(self.s)
1858 d = fireEventually()
1859 # examine the state right after the first bucket has been processed
1860 def _after_first_bucket(ignored):
1861 p = lc.get_progress()
1862 if not p["cycle-in-progress"]:
1863 d2 = fireEventually()
1864 d2.addCallback(_after_first_bucket)
1866 d.addCallback(_after_first_bucket)
1867 d.addCallback(lambda ign: self.render1(webstatus))
1868 def _check_html_in_cycle(html):
1869 s = remove_tags(html)
1870 # the first bucket encountered gets deleted, and its prefix
1871 # happens to be about 1/5th of the way through the ring, so the
1872 # predictor thinks we'll have 5 shares and that we'll delete them
1873 # all. This part of the test depends upon the SIs landing right
1874 # where they do now.
1875 self.failUnlessIn("The remainder of this cycle is expected to "
1876 "recover: 4 shares, 4 buckets", s)
1877 self.failUnlessIn("The whole cycle is expected to examine "
1878 "5 shares in 5 buckets and to recover: "
1879 "5 shares, 5 buckets", s)
1880 d.addCallback(_check_html_in_cycle)
1882 # wait for the crawler to finish the first cycle. Two shares should
1885 return bool(lc.get_state()["last-cycle-finished"] is not None)
1886 d.addCallback(lambda ign: self.poll(_wait))
1888 def _after_first_cycle(ignored):
1889 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1890 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1891 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1892 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1893 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1894 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1897 last = s["history"][0]
1899 self.failUnlessEqual(last["expiration-enabled"], True)
1900 self.failUnlessEqual(last["configured-expiration-mode"],
1901 ("age", 2000, None, ("mutable", "immutable")))
1902 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1904 rec = last["space-recovered"]
1905 self.failUnlessEqual(rec["examined-buckets"], 4)
1906 self.failUnlessEqual(rec["examined-shares"], 4)
1907 self.failUnlessEqual(rec["actual-buckets"], 2)
1908 self.failUnlessEqual(rec["original-buckets"], 2)
1909 self.failUnlessEqual(rec["configured-buckets"], 2)
1910 self.failUnlessEqual(rec["actual-shares"], 2)
1911 self.failUnlessEqual(rec["original-shares"], 2)
1912 self.failUnlessEqual(rec["configured-shares"], 2)
1913 size = sf0_size + sf2_size
1914 self.failUnlessEqual(rec["actual-sharebytes"], size)
1915 self.failUnlessEqual(rec["original-sharebytes"], size)
1916 self.failUnlessEqual(rec["configured-sharebytes"], size)
1917 # different platforms have different notions of "blocks used by
1918 # this file", so merely assert that it's a number
1919 self.failUnless(rec["actual-diskbytes"] >= 0,
1920 rec["actual-diskbytes"])
1921 self.failUnless(rec["original-diskbytes"] >= 0,
1922 rec["original-diskbytes"])
1923 self.failUnless(rec["configured-diskbytes"] >= 0,
1924 rec["configured-diskbytes"])
1925 d.addCallback(_after_first_cycle)
1926 d.addCallback(lambda ign: self.render1(webstatus))
1927 def _check_html(html):
1928 s = remove_tags(html)
1929 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1930 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1931 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1932 d.addCallback(_check_html)
1935 def test_expire_cutoff_date(self):
1936 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1937 fileutil.make_dirs(basedir)
1938 # setting cutoff-date to 2000 seconds ago means that any lease which
1939 # is more than 2000s old will be expired.
1941 then = int(now - 2000)
1942 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1943 expiration_enabled=True,
1944 expiration_mode="cutoff-date",
1945 expiration_cutoff_date=then)
1946 # make it start sooner than usual.
1947 lc = ss.lease_checker
1949 lc.stop_after_first_bucket = True
1950 webstatus = StorageStatus(ss)
1952 # create a few shares, with some leases on them
1953 self.make_shares(ss)
1954 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1956 def count_shares(si):
1957 return len(list(ss._iter_share_files(si)))
1958 def _get_sharefile(si):
1959 return list(ss._iter_share_files(si))[0]
1960 def count_leases(si):
1961 return len(list(_get_sharefile(si).get_leases()))
1963 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1964 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1965 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1966 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1967 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1968 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1969 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1970 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1972 # artificially crank back the expiration time on the first lease of
1973 # each share, to make it look like was renewed 3000s ago. To achieve
1974 # this, we need to set the expiration time to now-3000+31days. This
1975 # will change when the lease format is improved to contain both
1976 # create/renew time and duration.
1977 new_expiration_time = now - 3000 + 31*24*60*60
1979 # Some shares have an extra lease which is set to expire at the
1980 # default time in 31 days from now (age=31days). We then run the
1981 # crawler, which will expire the first lease, making some shares get
1982 # deleted and others stay alive (with one remaining lease)
1984 sf0 = _get_sharefile(immutable_si_0)
1985 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1986 sf0_size = os.stat(sf0.home).st_size
1988 # immutable_si_1 gets an extra lease
1989 sf1 = _get_sharefile(immutable_si_1)
1990 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1992 sf2 = _get_sharefile(mutable_si_2)
1993 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1994 sf2_size = os.stat(sf2.home).st_size
1996 # mutable_si_3 gets an extra lease
1997 sf3 = _get_sharefile(mutable_si_3)
1998 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2000 ss.setServiceParent(self.s)
2002 d = fireEventually()
2003 # examine the state right after the first bucket has been processed
2004 def _after_first_bucket(ignored):
2005 p = lc.get_progress()
2006 if not p["cycle-in-progress"]:
2007 d2 = fireEventually()
2008 d2.addCallback(_after_first_bucket)
2010 d.addCallback(_after_first_bucket)
2011 d.addCallback(lambda ign: self.render1(webstatus))
2012 def _check_html_in_cycle(html):
2013 s = remove_tags(html)
2014 # the first bucket encountered gets deleted, and its prefix
2015 # happens to be about 1/5th of the way through the ring, so the
2016 # predictor thinks we'll have 5 shares and that we'll delete them
2017 # all. This part of the test depends upon the SIs landing right
2018 # where they do now.
2019 self.failUnlessIn("The remainder of this cycle is expected to "
2020 "recover: 4 shares, 4 buckets", s)
2021 self.failUnlessIn("The whole cycle is expected to examine "
2022 "5 shares in 5 buckets and to recover: "
2023 "5 shares, 5 buckets", s)
2024 d.addCallback(_check_html_in_cycle)
2026 # wait for the crawler to finish the first cycle. Two shares should
2029 return bool(lc.get_state()["last-cycle-finished"] is not None)
2030 d.addCallback(lambda ign: self.poll(_wait))
2032 def _after_first_cycle(ignored):
2033 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2034 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2035 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2036 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2037 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2038 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2041 last = s["history"][0]
2043 self.failUnlessEqual(last["expiration-enabled"], True)
2044 self.failUnlessEqual(last["configured-expiration-mode"],
2045 ("cutoff-date", None, then,
2046 ("mutable", "immutable")))
2047 self.failUnlessEqual(last["leases-per-share-histogram"],
2050 rec = last["space-recovered"]
2051 self.failUnlessEqual(rec["examined-buckets"], 4)
2052 self.failUnlessEqual(rec["examined-shares"], 4)
2053 self.failUnlessEqual(rec["actual-buckets"], 2)
2054 self.failUnlessEqual(rec["original-buckets"], 0)
2055 self.failUnlessEqual(rec["configured-buckets"], 2)
2056 self.failUnlessEqual(rec["actual-shares"], 2)
2057 self.failUnlessEqual(rec["original-shares"], 0)
2058 self.failUnlessEqual(rec["configured-shares"], 2)
2059 size = sf0_size + sf2_size
2060 self.failUnlessEqual(rec["actual-sharebytes"], size)
2061 self.failUnlessEqual(rec["original-sharebytes"], 0)
2062 self.failUnlessEqual(rec["configured-sharebytes"], size)
2063 # different platforms have different notions of "blocks used by
2064 # this file", so merely assert that it's a number
2065 self.failUnless(rec["actual-diskbytes"] >= 0,
2066 rec["actual-diskbytes"])
2067 self.failUnless(rec["original-diskbytes"] >= 0,
2068 rec["original-diskbytes"])
2069 self.failUnless(rec["configured-diskbytes"] >= 0,
2070 rec["configured-diskbytes"])
2071 d.addCallback(_after_first_cycle)
2072 d.addCallback(lambda ign: self.render1(webstatus))
2073 def _check_html(html):
2074 s = remove_tags(html)
2075 self.failUnlessIn("Expiration Enabled:"
2076 " expired leases will be removed", s)
2077 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2078 substr = "Leases created or last renewed before %s will be considered expired." % date
2079 self.failUnlessIn(substr, s)
2080 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2081 d.addCallback(_check_html)
2084 def test_only_immutable(self):
2085 basedir = "storage/LeaseCrawler/only_immutable"
2086 fileutil.make_dirs(basedir)
2088 then = int(now - 2000)
2089 ss = StorageServer(basedir, "\x00" * 20,
2090 expiration_enabled=True,
2091 expiration_mode="cutoff-date",
2092 expiration_cutoff_date=then,
2093 expiration_sharetypes=("immutable",))
2094 lc = ss.lease_checker
2096 webstatus = StorageStatus(ss)
2098 self.make_shares(ss)
2099 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2100 # set all leases to be expirable
2101 new_expiration_time = now - 3000 + 31*24*60*60
2103 def count_shares(si):
2104 return len(list(ss._iter_share_files(si)))
2105 def _get_sharefile(si):
2106 return list(ss._iter_share_files(si))[0]
2107 def count_leases(si):
2108 return len(list(_get_sharefile(si).get_leases()))
2110 sf0 = _get_sharefile(immutable_si_0)
2111 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2112 sf1 = _get_sharefile(immutable_si_1)
2113 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2114 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2115 sf2 = _get_sharefile(mutable_si_2)
2116 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2117 sf3 = _get_sharefile(mutable_si_3)
2118 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2119 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2121 ss.setServiceParent(self.s)
2123 return bool(lc.get_state()["last-cycle-finished"] is not None)
2124 d = self.poll(_wait)
2126 def _after_first_cycle(ignored):
2127 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2128 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2129 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2130 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2131 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2132 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2133 d.addCallback(_after_first_cycle)
2134 d.addCallback(lambda ign: self.render1(webstatus))
2135 def _check_html(html):
2136 s = remove_tags(html)
2137 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2138 d.addCallback(_check_html)
2141 def test_only_mutable(self):
2142 basedir = "storage/LeaseCrawler/only_mutable"
2143 fileutil.make_dirs(basedir)
2145 then = int(now - 2000)
2146 ss = StorageServer(basedir, "\x00" * 20,
2147 expiration_enabled=True,
2148 expiration_mode="cutoff-date",
2149 expiration_cutoff_date=then,
2150 expiration_sharetypes=("mutable",))
2151 lc = ss.lease_checker
2153 webstatus = StorageStatus(ss)
2155 self.make_shares(ss)
2156 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2157 # set all leases to be expirable
2158 new_expiration_time = now - 3000 + 31*24*60*60
2160 def count_shares(si):
2161 return len(list(ss._iter_share_files(si)))
2162 def _get_sharefile(si):
2163 return list(ss._iter_share_files(si))[0]
2164 def count_leases(si):
2165 return len(list(_get_sharefile(si).get_leases()))
2167 sf0 = _get_sharefile(immutable_si_0)
2168 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2169 sf1 = _get_sharefile(immutable_si_1)
2170 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2171 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2172 sf2 = _get_sharefile(mutable_si_2)
2173 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2174 sf3 = _get_sharefile(mutable_si_3)
2175 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2176 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2178 ss.setServiceParent(self.s)
2180 return bool(lc.get_state()["last-cycle-finished"] is not None)
2181 d = self.poll(_wait)
2183 def _after_first_cycle(ignored):
2184 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2185 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2186 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2187 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2188 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2189 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2190 d.addCallback(_after_first_cycle)
2191 d.addCallback(lambda ign: self.render1(webstatus))
2192 def _check_html(html):
2193 s = remove_tags(html)
2194 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2195 d.addCallback(_check_html)
2198 def test_bad_mode(self):
2199 basedir = "storage/LeaseCrawler/bad_mode"
2200 fileutil.make_dirs(basedir)
2201 e = self.failUnlessRaises(ValueError,
2202 StorageServer, basedir, "\x00" * 20,
2203 expiration_mode="bogus")
2204 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2206 def test_parse_duration(self):
2210 p = time_format.parse_duration
2211 self.failUnlessEqual(p("7days"), 7*DAY)
2212 self.failUnlessEqual(p("31day"), 31*DAY)
2213 self.failUnlessEqual(p("60 days"), 60*DAY)
2214 self.failUnlessEqual(p("2mo"), 2*MONTH)
2215 self.failUnlessEqual(p("3 month"), 3*MONTH)
2216 self.failUnlessEqual(p("2years"), 2*YEAR)
2217 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2218 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2220 def test_parse_date(self):
2221 p = time_format.parse_date
2222 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2223 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2225 def test_limited_history(self):
2226 basedir = "storage/LeaseCrawler/limited_history"
2227 fileutil.make_dirs(basedir)
2228 ss = StorageServer(basedir, "\x00" * 20)
2229 # make it start sooner than usual.
2230 lc = ss.lease_checker
2234 # create a few shares, with some leases on them
2235 self.make_shares(ss)
2237 ss.setServiceParent(self.s)
2239 def _wait_until_15_cycles_done():
2240 last = lc.state["last-cycle-finished"]
2241 if last is not None and last >= 15:
2246 d = self.poll(_wait_until_15_cycles_done)
2248 def _check(ignored):
2251 self.failUnlessEqual(len(h), 10)
2252 self.failUnlessEqual(max(h.keys()), 15)
2253 self.failUnlessEqual(min(h.keys()), 6)
2254 d.addCallback(_check)
2257 def test_unpredictable_future(self):
2258 basedir = "storage/LeaseCrawler/unpredictable_future"
2259 fileutil.make_dirs(basedir)
2260 ss = StorageServer(basedir, "\x00" * 20)
2261 # make it start sooner than usual.
2262 lc = ss.lease_checker
2264 lc.cpu_slice = -1.0 # stop quickly
2266 self.make_shares(ss)
2268 ss.setServiceParent(self.s)
2270 d = fireEventually()
2271 def _check(ignored):
2272 # this should fire after the first bucket is complete, but before
2273 # the first prefix is complete, so the progress-measurer won't
2274 # think we've gotten far enough to raise our percent-complete
2275 # above 0%, triggering the cannot-predict-the-future code in
2276 # expirer.py . This will have to change if/when the
2277 # progress-measurer gets smart enough to count buckets (we'll
2278 # have to interrupt it even earlier, before it's finished the
2281 if "cycle-to-date" not in s:
2282 d2 = fireEventually()
2283 d2.addCallback(_check)
2285 self.failUnlessIn("cycle-to-date", s)
2286 self.failUnlessIn("estimated-remaining-cycle", s)
2287 self.failUnlessIn("estimated-current-cycle", s)
2289 left = s["estimated-remaining-cycle"]["space-recovered"]
2290 self.failUnlessEqual(left["actual-buckets"], None)
2291 self.failUnlessEqual(left["original-buckets"], None)
2292 self.failUnlessEqual(left["configured-buckets"], None)
2293 self.failUnlessEqual(left["actual-shares"], None)
2294 self.failUnlessEqual(left["original-shares"], None)
2295 self.failUnlessEqual(left["configured-shares"], None)
2296 self.failUnlessEqual(left["actual-diskbytes"], None)
2297 self.failUnlessEqual(left["original-diskbytes"], None)
2298 self.failUnlessEqual(left["configured-diskbytes"], None)
2299 self.failUnlessEqual(left["actual-sharebytes"], None)
2300 self.failUnlessEqual(left["original-sharebytes"], None)
2301 self.failUnlessEqual(left["configured-sharebytes"], None)
2303 full = s["estimated-remaining-cycle"]["space-recovered"]
2304 self.failUnlessEqual(full["actual-buckets"], None)
2305 self.failUnlessEqual(full["original-buckets"], None)
2306 self.failUnlessEqual(full["configured-buckets"], None)
2307 self.failUnlessEqual(full["actual-shares"], None)
2308 self.failUnlessEqual(full["original-shares"], None)
2309 self.failUnlessEqual(full["configured-shares"], None)
2310 self.failUnlessEqual(full["actual-diskbytes"], None)
2311 self.failUnlessEqual(full["original-diskbytes"], None)
2312 self.failUnlessEqual(full["configured-diskbytes"], None)
2313 self.failUnlessEqual(full["actual-sharebytes"], None)
2314 self.failUnlessEqual(full["original-sharebytes"], None)
2315 self.failUnlessEqual(full["configured-sharebytes"], None)
2317 d.addCallback(_check)
2320 def test_no_st_blocks(self):
2321 basedir = "storage/LeaseCrawler/no_st_blocks"
2322 fileutil.make_dirs(basedir)
2323 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2324 expiration_mode="age",
2325 expiration_override_lease_duration=-1000)
2326 # a negative expiration_time= means the "configured-"
2327 # space-recovered counts will be non-zero, since all shares will have
2330 # make it start sooner than usual.
2331 lc = ss.lease_checker
2334 self.make_shares(ss)
2335 ss.setServiceParent(self.s)
2337 return bool(lc.get_state()["last-cycle-finished"] is not None)
2338 d = self.poll(_wait)
2340 def _check(ignored):
2342 last = s["history"][0]
2343 rec = last["space-recovered"]
2344 self.failUnlessEqual(rec["configured-buckets"], 4)
2345 self.failUnlessEqual(rec["configured-shares"], 4)
2346 self.failUnless(rec["configured-sharebytes"] > 0,
2347 rec["configured-sharebytes"])
2348 # without the .st_blocks field in os.stat() results, we should be
2349 # reporting diskbytes==sharebytes
2350 self.failUnlessEqual(rec["configured-sharebytes"],
2351 rec["configured-diskbytes"])
2352 d.addCallback(_check)
2355 def test_share_corruption(self):
2356 self._poll_should_ignore_these_errors = [
2357 UnknownMutableContainerVersionError,
2358 UnknownImmutableContainerVersionError,
2360 basedir = "storage/LeaseCrawler/share_corruption"
2361 fileutil.make_dirs(basedir)
2362 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2363 w = StorageStatus(ss)
2364 # make it start sooner than usual.
2365 lc = ss.lease_checker
2366 lc.stop_after_first_bucket = True
2370 # create a few shares, with some leases on them
2371 self.make_shares(ss)
2373 # now corrupt one, and make sure the lease-checker keeps going
2374 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2375 first = min(self.sis)
2376 first_b32 = base32.b2a(first)
2377 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2380 f.write("BAD MAGIC")
2382 # if get_share_file() doesn't see the correct mutable magic, it
2383 # assumes the file is an immutable share, and then
2384 # immutable.ShareFile sees a bad version. So regardless of which kind
2385 # of share we corrupted, this will trigger an
2386 # UnknownImmutableContainerVersionError.
2388 # also create an empty bucket
2389 empty_si = base32.b2a("\x04"*16)
2390 empty_bucket_dir = os.path.join(ss.sharedir,
2391 storage_index_to_dir(empty_si))
2392 fileutil.make_dirs(empty_bucket_dir)
2394 ss.setServiceParent(self.s)
2396 d = fireEventually()
2398 # now examine the state right after the first bucket has been
2400 def _after_first_bucket(ignored):
2402 if "cycle-to-date" not in s:
2403 d2 = fireEventually()
2404 d2.addCallback(_after_first_bucket)
2406 so_far = s["cycle-to-date"]
2407 rec = so_far["space-recovered"]
2408 self.failUnlessEqual(rec["examined-buckets"], 1)
2409 self.failUnlessEqual(rec["examined-shares"], 0)
2410 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2411 d.addCallback(_after_first_bucket)
2413 d.addCallback(lambda ign: self.render_json(w))
2414 def _check_json(json):
2415 data = simplejson.loads(json)
2416 # grr. json turns all dict keys into strings.
2417 so_far = data["lease-checker"]["cycle-to-date"]
2418 corrupt_shares = so_far["corrupt-shares"]
2419 # it also turns all tuples into lists
2420 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2421 d.addCallback(_check_json)
2422 d.addCallback(lambda ign: self.render1(w))
2423 def _check_html(html):
2424 s = remove_tags(html)
2425 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2426 d.addCallback(_check_html)
2429 return bool(lc.get_state()["last-cycle-finished"] is not None)
2430 d.addCallback(lambda ign: self.poll(_wait))
2432 def _after_first_cycle(ignored):
2434 last = s["history"][0]
2435 rec = last["space-recovered"]
2436 self.failUnlessEqual(rec["examined-buckets"], 5)
2437 self.failUnlessEqual(rec["examined-shares"], 3)
2438 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2439 d.addCallback(_after_first_cycle)
2440 d.addCallback(lambda ign: self.render_json(w))
2441 def _check_json_history(json):
2442 data = simplejson.loads(json)
2443 last = data["lease-checker"]["history"]["0"]
2444 corrupt_shares = last["corrupt-shares"]
2445 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2446 d.addCallback(_check_json_history)
2447 d.addCallback(lambda ign: self.render1(w))
2448 def _check_html_history(html):
2449 s = remove_tags(html)
2450 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2451 d.addCallback(_check_html_history)
2454 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2455 UnknownImmutableContainerVersionError)
2460 def render_json(self, page):
2461 d = self.render1(page, args={"t": ["json"]})
2464 class NoDiskStatsServer(StorageServer):
2465 def get_disk_stats(self):
2466 raise AttributeError
2468 class BadDiskStatsServer(StorageServer):
2469 def get_disk_stats(self):
2472 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2475 self.s = service.MultiService()
2476 self.s.startService()
2478 return self.s.stopService()
2480 def test_no_server(self):
2481 w = StorageStatus(None)
2482 html = w.renderSynchronously()
2483 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2485 def test_status(self):
2486 basedir = "storage/WebStatus/status"
2487 fileutil.make_dirs(basedir)
2488 ss = StorageServer(basedir, "\x00" * 20)
2489 ss.setServiceParent(self.s)
2490 w = StorageStatus(ss)
2492 def _check_html(html):
2493 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2494 s = remove_tags(html)
2495 self.failUnlessIn("Accepting new shares: Yes", s)
2496 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2497 d.addCallback(_check_html)
2498 d.addCallback(lambda ign: self.render_json(w))
2499 def _check_json(json):
2500 data = simplejson.loads(json)
2502 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2503 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2504 self.failUnlessIn("bucket-counter", data)
2505 self.failUnlessIn("lease-checker", data)
2506 d.addCallback(_check_json)
2509 def render_json(self, page):
2510 d = self.render1(page, args={"t": ["json"]})
2513 def test_status_no_disk_stats(self):
2514 # Some platforms may have no disk stats API. Make sure the code can handle that
2515 # (test runs on all platforms).
2516 basedir = "storage/WebStatus/status_no_disk_stats"
2517 fileutil.make_dirs(basedir)
2518 ss = NoDiskStatsServer(basedir, "\x00" * 20)
2519 ss.setServiceParent(self.s)
2520 w = StorageStatus(ss)
2521 html = w.renderSynchronously()
2522 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2523 s = remove_tags(html)
2524 self.failUnlessIn("Accepting new shares: Yes", s)
2525 self.failUnlessIn("Total disk space: ?", s)
2526 self.failUnlessIn("Space Available to Tahoe: ?", s)
2527 self.failUnless(ss.get_available_space() is None)
2529 def test_status_bad_disk_stats(self):
2530 # If the API to get disk stats exists but a call to it fails, then the status should
2531 # show that no shares will be accepted, and get_available_space() should be 0.
2532 basedir = "storage/WebStatus/status_bad_disk_stats"
2533 fileutil.make_dirs(basedir)
2534 ss = BadDiskStatsServer(basedir, "\x00" * 20)
2535 ss.setServiceParent(self.s)
2536 w = StorageStatus(ss)
2537 html = w.renderSynchronously()
2538 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2539 s = remove_tags(html)
2540 self.failUnlessIn("Accepting new shares: No", s)
2541 self.failUnlessIn("Total disk space: ?", s)
2542 self.failUnlessIn("Space Available to Tahoe: ?", s)
2543 self.failUnlessEqual(ss.get_available_space(), 0)
2545 def test_readonly(self):
2546 basedir = "storage/WebStatus/readonly"
2547 fileutil.make_dirs(basedir)
2548 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2549 ss.setServiceParent(self.s)
2550 w = StorageStatus(ss)
2551 html = w.renderSynchronously()
2552 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2553 s = remove_tags(html)
2554 self.failUnlessIn("Accepting new shares: No", s)
2556 def test_reserved(self):
2557 basedir = "storage/WebStatus/reserved"
2558 fileutil.make_dirs(basedir)
2559 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2560 ss.setServiceParent(self.s)
2561 w = StorageStatus(ss)
2562 html = w.renderSynchronously()
2563 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2564 s = remove_tags(html)
2565 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2567 def test_huge_reserved(self):
2568 basedir = "storage/WebStatus/reserved"
2569 fileutil.make_dirs(basedir)
2570 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2571 ss.setServiceParent(self.s)
2572 w = StorageStatus(ss)
2573 html = w.renderSynchronously()
2574 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2575 s = remove_tags(html)
2576 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2578 def test_util(self):
2579 w = StorageStatus(None)
2580 self.failUnlessEqual(w.render_space(None, None), "?")
2581 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2582 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2583 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2584 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2585 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)