1 import time, os.path, platform, stat, re, simplejson, struct
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.interfaces import BadWriteEnablerError
24 from allmydata.test.common import LoggingServiceParent
25 from allmydata.test.common_web import WebRenderingMixin
26 from allmydata.web.storage import StorageStatus, remove_prefix
31 def __init__(self, ignore_disconnectors=False):
32 self.ignore = ignore_disconnectors
33 self.disconnectors = {}
34 def notifyOnDisconnect(self, f, *args, **kwargs):
38 self.disconnectors[m] = (f, args, kwargs)
40 def dontNotifyOnDisconnect(self, marker):
43 del self.disconnectors[marker]
45 class FakeStatsProvider:
46 def count(self, name, delta=1):
48 def register_producer(self, producer):
51 class Bucket(unittest.TestCase):
52 def make_workdir(self, name):
53 basedir = os.path.join("storage", "Bucket", name)
54 incoming = os.path.join(basedir, "tmp", "bucket")
55 final = os.path.join(basedir, "bucket")
56 fileutil.make_dirs(basedir)
57 fileutil.make_dirs(os.path.join(basedir, "tmp"))
58 return incoming, final
60 def bucket_writer_closed(self, bw, consumed):
62 def add_latency(self, category, latency):
64 def count(self, name, delta=1):
69 renew_secret = os.urandom(32)
70 cancel_secret = os.urandom(32)
71 expiration_time = time.time() + 5000
72 return LeaseInfo(owner_num, renew_secret, cancel_secret,
73 expiration_time, "\x00" * 20)
75 def test_create(self):
76 incoming, final = self.make_workdir("test_create")
77 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
79 bw.remote_write(0, "a"*25)
80 bw.remote_write(25, "b"*25)
81 bw.remote_write(50, "c"*25)
82 bw.remote_write(75, "d"*7)
85 def test_readwrite(self):
86 incoming, final = self.make_workdir("test_readwrite")
87 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
89 bw.remote_write(0, "a"*25)
90 bw.remote_write(25, "b"*25)
91 bw.remote_write(50, "c"*7) # last block may be short
95 br = BucketReader(self, bw.finalhome)
96 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
97 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
98 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
102 def callRemote(self, methname, *args, **kwargs):
104 meth = getattr(self.target, "remote_" + methname)
105 return meth(*args, **kwargs)
106 return defer.maybeDeferred(_call)
108 class BucketProxy(unittest.TestCase):
109 def make_bucket(self, name, size):
110 basedir = os.path.join("storage", "BucketProxy", name)
111 incoming = os.path.join(basedir, "tmp", "bucket")
112 final = os.path.join(basedir, "bucket")
113 fileutil.make_dirs(basedir)
114 fileutil.make_dirs(os.path.join(basedir, "tmp"))
115 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
121 def make_lease(self):
123 renew_secret = os.urandom(32)
124 cancel_secret = os.urandom(32)
125 expiration_time = time.time() + 5000
126 return LeaseInfo(owner_num, renew_secret, cancel_secret,
127 expiration_time, "\x00" * 20)
129 def bucket_writer_closed(self, bw, consumed):
131 def add_latency(self, category, latency):
133 def count(self, name, delta=1):
136 def test_create(self):
137 bw, rb, sharefname = self.make_bucket("test_create", 500)
138 bp = WriteBucketProxy(rb,
143 uri_extension_size_max=500, nodeid=None)
144 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
146 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
147 # Let's pretend each share has 100 bytes of data, and that there are
148 # 4 segments (25 bytes each), and 8 shares total. So the two
149 # per-segment merkle trees (crypttext_hash_tree,
150 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
151 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
152 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
153 # long. That should make the whole share:
155 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
156 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
158 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
160 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
162 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
164 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
166 uri_extension = "s" + "E"*498 + "e"
168 bw, rb, sharefname = self.make_bucket(name, sharesize)
174 uri_extension_size_max=len(uri_extension),
178 d.addCallback(lambda res: bp.put_block(0, "a"*25))
179 d.addCallback(lambda res: bp.put_block(1, "b"*25))
180 d.addCallback(lambda res: bp.put_block(2, "c"*25))
181 d.addCallback(lambda res: bp.put_block(3, "d"*20))
182 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
183 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
184 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
185 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
186 d.addCallback(lambda res: bp.close())
188 # now read everything back
189 def _start_reading(res):
190 br = BucketReader(self, sharefname)
193 rbp = rbp_class(rb, peerid="abc", storage_index="")
194 self.failUnlessIn("to peer", repr(rbp))
195 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
197 d1 = rbp.get_block_data(0, 25, 25)
198 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
199 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
200 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
201 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
202 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
203 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
204 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
206 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
207 d1.addCallback(lambda res:
208 self.failUnlessEqual(res, crypttext_hashes))
209 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
210 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
211 d1.addCallback(lambda res: rbp.get_share_hashes())
212 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
213 d1.addCallback(lambda res: rbp.get_uri_extension())
214 d1.addCallback(lambda res:
215 self.failUnlessEqual(res, uri_extension))
219 d.addCallback(_start_reading)
223 def test_readwrite_v1(self):
224 return self._do_test_readwrite("test_readwrite_v1",
225 0x24, WriteBucketProxy, ReadBucketProxy)
227 def test_readwrite_v2(self):
228 return self._do_test_readwrite("test_readwrite_v2",
229 0x44, WriteBucketProxy_v2, ReadBucketProxy)
231 class Server(unittest.TestCase):
234 self.sparent = LoggingServiceParent()
235 self.sparent.startService()
236 self._lease_secret = itertools.count()
238 return self.sparent.stopService()
240 def workdir(self, name):
241 basedir = os.path.join("storage", "Server", name)
244 def create(self, name, reserved_space=0, klass=StorageServer):
245 workdir = self.workdir(name)
246 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
247 stats_provider=FakeStatsProvider())
248 ss.setServiceParent(self.sparent)
251 def test_create(self):
252 self.create("test_create")
254 def allocate(self, ss, storage_index, sharenums, size, canary=None):
255 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
256 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
258 canary = FakeCanary()
259 return ss.remote_allocate_buckets(storage_index,
260 renew_secret, cancel_secret,
261 sharenums, size, canary)
263 def test_large_share(self):
264 syslow = platform.system().lower()
265 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
266 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
268 avail = fileutil.get_available_space('.', 512*2**20)
270 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
272 ss = self.create("test_large_share")
274 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
275 self.failUnlessEqual(already, set())
276 self.failUnlessEqual(set(writers.keys()), set([0]))
278 shnum, bucket = writers.items()[0]
279 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
280 bucket.remote_write(2**32, "ab")
281 bucket.remote_close()
283 readers = ss.remote_get_buckets("allocate")
284 reader = readers[shnum]
285 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
287 def test_dont_overfill_dirs(self):
289 This test asserts that if you add a second share whose storage index
290 share lots of leading bits with an extant share (but isn't the exact
291 same storage index), this won't add an entry to the share directory.
293 ss = self.create("test_dont_overfill_dirs")
294 already, writers = self.allocate(ss, "storageindex", [0], 10)
295 for i, wb in writers.items():
296 wb.remote_write(0, "%10d" % i)
298 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
300 children_of_storedir = set(os.listdir(storedir))
302 # Now store another one under another storageindex that has leading
303 # chars the same as the first storageindex.
304 already, writers = self.allocate(ss, "storageindey", [0], 10)
305 for i, wb in writers.items():
306 wb.remote_write(0, "%10d" % i)
308 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
310 new_children_of_storedir = set(os.listdir(storedir))
311 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
313 def test_remove_incoming(self):
314 ss = self.create("test_remove_incoming")
315 already, writers = self.allocate(ss, "vid", range(3), 10)
316 for i,wb in writers.items():
317 wb.remote_write(0, "%10d" % i)
319 incoming_share_dir = wb.incominghome
320 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
321 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
322 incoming_dir = os.path.dirname(incoming_prefix_dir)
323 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
324 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
325 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
327 def test_abort(self):
328 # remote_abort, when called on a writer, should make sure that
329 # the allocated size of the bucket is not counted by the storage
330 # server when accounting for space.
331 ss = self.create("test_abort")
332 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
333 self.failIfEqual(ss.allocated_size(), 0)
335 # Now abort the writers.
336 for writer in writers.itervalues():
337 writer.remote_abort()
338 self.failUnlessEqual(ss.allocated_size(), 0)
341 def test_allocate(self):
342 ss = self.create("test_allocate")
344 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
346 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
347 self.failUnlessEqual(already, set())
348 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
350 # while the buckets are open, they should not count as readable
351 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
354 for i,wb in writers.items():
355 wb.remote_write(0, "%25d" % i)
357 # aborting a bucket that was already closed is a no-op
360 # now they should be readable
361 b = ss.remote_get_buckets("allocate")
362 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
363 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
365 self.failUnlessIn("BucketReader", b_str)
366 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
368 # now if we ask about writing again, the server should offer those
369 # three buckets as already present. It should offer them even if we
370 # don't ask about those specific ones.
371 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
372 self.failUnlessEqual(already, set([0,1,2]))
373 self.failUnlessEqual(set(writers.keys()), set([3,4]))
375 # while those two buckets are open for writing, the server should
376 # refuse to offer them to uploaders
378 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
379 self.failUnlessEqual(already2, set([0,1,2]))
380 self.failUnlessEqual(set(writers2.keys()), set([5]))
382 # aborting the writes should remove the tempfiles
383 for i,wb in writers2.items():
385 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
386 self.failUnlessEqual(already2, set([0,1,2]))
387 self.failUnlessEqual(set(writers2.keys()), set([5]))
389 for i,wb in writers2.items():
391 for i,wb in writers.items():
394 def test_bad_container_version(self):
395 ss = self.create("test_bad_container_version")
396 a,w = self.allocate(ss, "si1", [0], 10)
397 w[0].remote_write(0, "\xff"*10)
400 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
403 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
406 ss.remote_get_buckets("allocate")
408 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
409 ss.remote_get_buckets, "si1")
410 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
412 def test_disconnect(self):
413 # simulate a disconnection
414 ss = self.create("test_disconnect")
415 canary = FakeCanary()
416 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
417 self.failUnlessEqual(already, set())
418 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
419 for (f,args,kwargs) in canary.disconnectors.values():
424 # that ought to delete the incoming shares
425 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
426 self.failUnlessEqual(already, set())
427 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
429 @mock.patch('allmydata.util.fileutil.get_disk_stats')
430 def test_reserved_space(self, mock_get_disk_stats):
432 mock_get_disk_stats.return_value = {
433 'free_for_nonroot': 15000,
434 'avail': max(15000 - reserved_space, 0),
437 ss = self.create("test_reserved_space", reserved_space=reserved_space)
438 # 15k available, 10k reserved, leaves 5k for shares
440 # a newly created and filled share incurs this much overhead, beyond
441 # the size we request.
443 LEASE_SIZE = 4+32+32+4
444 canary = FakeCanary(True)
445 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
446 self.failUnlessEqual(len(writers), 3)
447 # now the StorageServer should have 3000 bytes provisionally
448 # allocated, allowing only 2000 more to be claimed
449 self.failUnlessEqual(len(ss._active_writers), 3)
451 # allocating 1001-byte shares only leaves room for one
452 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
453 self.failUnlessEqual(len(writers2), 1)
454 self.failUnlessEqual(len(ss._active_writers), 4)
456 # we abandon the first set, so their provisional allocation should be
460 self.failUnlessEqual(len(ss._active_writers), 1)
461 # now we have a provisional allocation of 1001 bytes
463 # and we close the second set, so their provisional allocation should
464 # become real, long-term allocation, and grows to include the
466 for bw in writers2.values():
467 bw.remote_write(0, "a"*25)
472 self.failUnlessEqual(len(ss._active_writers), 0)
474 allocated = 1001 + OVERHEAD + LEASE_SIZE
476 # we have to manually increase available, since we're not doing real
478 mock_get_disk_stats.return_value = {
479 'free_for_nonroot': 15000 - allocated,
480 'avail': max(15000 - allocated - reserved_space, 0),
483 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
484 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
485 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
486 self.failUnlessEqual(len(writers3), 39)
487 self.failUnlessEqual(len(ss._active_writers), 39)
491 self.failUnlessEqual(len(ss._active_writers), 0)
492 ss.disownServiceParent()
496 basedir = self.workdir("test_seek_behavior")
497 fileutil.make_dirs(basedir)
498 filename = os.path.join(basedir, "testfile")
499 f = open(filename, "wb")
502 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
503 # files. mode="a" preserves previous contents but does not allow
504 # seeking-to-create-holes. mode="r+" allows both.
505 f = open(filename, "rb+")
509 filelen = os.stat(filename)[stat.ST_SIZE]
510 self.failUnlessEqual(filelen, 100+3)
511 f2 = open(filename, "rb")
512 self.failUnlessEqual(f2.read(5), "start")
515 def test_leases(self):
516 ss = self.create("test_leases")
517 canary = FakeCanary()
521 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
522 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
523 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
524 sharenums, size, canary)
525 self.failUnlessEqual(len(already), 0)
526 self.failUnlessEqual(len(writers), 5)
527 for wb in writers.values():
530 leases = list(ss.get_leases("si0"))
531 self.failUnlessEqual(len(leases), 1)
532 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
534 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
535 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
536 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
537 sharenums, size, canary)
538 for wb in writers.values():
541 # take out a second lease on si1
542 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
543 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
544 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
545 sharenums, size, canary)
546 self.failUnlessEqual(len(already), 5)
547 self.failUnlessEqual(len(writers), 0)
549 leases = list(ss.get_leases("si1"))
550 self.failUnlessEqual(len(leases), 2)
551 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
553 # and a third lease, using add-lease
554 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
555 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
556 ss.remote_add_lease("si1", rs2a, cs2a)
557 leases = list(ss.get_leases("si1"))
558 self.failUnlessEqual(len(leases), 3)
559 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
561 # add-lease on a missing storage index is silently ignored
562 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
564 # check that si0 is readable
565 readers = ss.remote_get_buckets("si0")
566 self.failUnlessEqual(len(readers), 5)
568 # renew the first lease. Only the proper renew_secret should work
569 ss.remote_renew_lease("si0", rs0)
570 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
571 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
573 # check that si0 is still readable
574 readers = ss.remote_get_buckets("si0")
575 self.failUnlessEqual(len(readers), 5)
578 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
579 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
580 ss.remote_cancel_lease("si0", cs0)
582 # si0 should now be gone
583 readers = ss.remote_get_buckets("si0")
584 self.failUnlessEqual(len(readers), 0)
585 # and the renew should no longer work
586 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
589 # cancel the first lease on si1, leaving the second and third in place
590 ss.remote_cancel_lease("si1", cs1)
591 readers = ss.remote_get_buckets("si1")
592 self.failUnlessEqual(len(readers), 5)
593 # the corresponding renew should no longer work
594 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
596 leases = list(ss.get_leases("si1"))
597 self.failUnlessEqual(len(leases), 2)
598 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
600 ss.remote_renew_lease("si1", rs2)
601 # cancelling the second and third should make it go away
602 ss.remote_cancel_lease("si1", cs2)
603 ss.remote_cancel_lease("si1", cs2a)
604 readers = ss.remote_get_buckets("si1")
605 self.failUnlessEqual(len(readers), 0)
606 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
607 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
608 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
610 leases = list(ss.get_leases("si1"))
611 self.failUnlessEqual(len(leases), 0)
614 # test overlapping uploads
615 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
616 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
617 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
618 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
619 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
620 sharenums, size, canary)
621 self.failUnlessEqual(len(already), 0)
622 self.failUnlessEqual(len(writers), 5)
623 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
624 sharenums, size, canary)
625 self.failUnlessEqual(len(already2), 0)
626 self.failUnlessEqual(len(writers2), 0)
627 for wb in writers.values():
630 leases = list(ss.get_leases("si3"))
631 self.failUnlessEqual(len(leases), 1)
633 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
634 sharenums, size, canary)
635 self.failUnlessEqual(len(already3), 5)
636 self.failUnlessEqual(len(writers3), 0)
638 leases = list(ss.get_leases("si3"))
639 self.failUnlessEqual(len(leases), 2)
641 def test_readonly(self):
642 workdir = self.workdir("test_readonly")
643 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
644 ss.setServiceParent(self.sparent)
646 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
647 self.failUnlessEqual(already, set())
648 self.failUnlessEqual(writers, {})
650 stats = ss.get_stats()
651 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
652 if "storage_server.disk_avail" in stats:
653 # Some platforms may not have an API to get disk stats.
654 # But if there are stats, readonly_storage means disk_avail=0
655 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
657 def test_discard(self):
658 # discard is really only used for other tests, but we test it anyways
659 workdir = self.workdir("test_discard")
660 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
661 ss.setServiceParent(self.sparent)
663 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
664 self.failUnlessEqual(already, set())
665 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
666 for i,wb in writers.items():
667 wb.remote_write(0, "%25d" % i)
669 # since we discard the data, the shares should be present but sparse.
670 # Since we write with some seeks, the data we read back will be all
672 b = ss.remote_get_buckets("vid")
673 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
674 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
676 def test_advise_corruption(self):
677 workdir = self.workdir("test_advise_corruption")
678 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
679 ss.setServiceParent(self.sparent)
681 si0_s = base32.b2a("si0")
682 ss.remote_advise_corrupt_share("immutable", "si0", 0,
683 "This share smells funny.\n")
684 reportdir = os.path.join(workdir, "corruption-advisories")
685 reports = os.listdir(reportdir)
686 self.failUnlessEqual(len(reports), 1)
687 report_si0 = reports[0]
688 self.failUnlessIn(si0_s, report_si0)
689 f = open(os.path.join(reportdir, report_si0), "r")
692 self.failUnlessIn("type: immutable", report)
693 self.failUnlessIn("storage_index: %s" % si0_s, report)
694 self.failUnlessIn("share_number: 0", report)
695 self.failUnlessIn("This share smells funny.", report)
697 # test the RIBucketWriter version too
698 si1_s = base32.b2a("si1")
699 already,writers = self.allocate(ss, "si1", [1], 75)
700 self.failUnlessEqual(already, set())
701 self.failUnlessEqual(set(writers.keys()), set([1]))
702 writers[1].remote_write(0, "data")
703 writers[1].remote_close()
705 b = ss.remote_get_buckets("si1")
706 self.failUnlessEqual(set(b.keys()), set([1]))
707 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
709 reports = os.listdir(reportdir)
710 self.failUnlessEqual(len(reports), 2)
711 report_si1 = [r for r in reports if si1_s in r][0]
712 f = open(os.path.join(reportdir, report_si1), "r")
715 self.failUnlessIn("type: immutable", report)
716 self.failUnlessIn("storage_index: %s" % si1_s, report)
717 self.failUnlessIn("share_number: 1", report)
718 self.failUnlessIn("This share tastes like dust.", report)
722 class MutableServer(unittest.TestCase):
725 self.sparent = LoggingServiceParent()
726 self._lease_secret = itertools.count()
728 return self.sparent.stopService()
730 def workdir(self, name):
731 basedir = os.path.join("storage", "MutableServer", name)
734 def create(self, name):
735 workdir = self.workdir(name)
736 ss = StorageServer(workdir, "\x00" * 20)
737 ss.setServiceParent(self.sparent)
740 def test_create(self):
741 self.create("test_create")
743 def write_enabler(self, we_tag):
744 return hashutil.tagged_hash("we_blah", we_tag)
746 def renew_secret(self, tag):
747 return hashutil.tagged_hash("renew_blah", str(tag))
749 def cancel_secret(self, tag):
750 return hashutil.tagged_hash("cancel_blah", str(tag))
752 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
753 write_enabler = self.write_enabler(we_tag)
754 renew_secret = self.renew_secret(lease_tag)
755 cancel_secret = self.cancel_secret(lease_tag)
756 rstaraw = ss.remote_slot_testv_and_readv_and_writev
757 testandwritev = dict( [ (shnum, ([], [], None) )
758 for shnum in sharenums ] )
760 rc = rstaraw(storage_index,
761 (write_enabler, renew_secret, cancel_secret),
764 (did_write, readv_data) = rc
765 self.failUnless(did_write)
766 self.failUnless(isinstance(readv_data, dict))
767 self.failUnlessEqual(len(readv_data), 0)
769 def test_bad_magic(self):
770 ss = self.create("test_bad_magic")
771 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
772 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
777 read = ss.remote_slot_readv
778 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
779 read, "si1", [0], [(0,10)])
780 self.failUnlessIn(" had magic ", str(e))
781 self.failUnlessIn(" but we wanted ", str(e))
783 def test_container_size(self):
784 ss = self.create("test_container_size")
785 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
787 read = ss.remote_slot_readv
788 rstaraw = ss.remote_slot_testv_and_readv_and_writev
789 secrets = ( self.write_enabler("we1"),
790 self.renew_secret("we1"),
791 self.cancel_secret("we1") )
792 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
793 answer = rstaraw("si1", secrets,
794 {0: ([], [(0,data)], len(data)+12)},
796 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
798 # trying to make the container too large will raise an exception
799 TOOBIG = MutableShareFile.MAX_SIZE + 10
800 self.failUnlessRaises(DataTooLargeError,
801 rstaraw, "si1", secrets,
802 {0: ([], [(0,data)], TOOBIG)},
805 # it should be possible to make the container smaller, although at
806 # the moment this doesn't actually affect the share, unless the
807 # container size is dropped to zero, in which case the share is
809 answer = rstaraw("si1", secrets,
810 {0: ([], [(0,data)], len(data)+8)},
812 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
814 answer = rstaraw("si1", secrets,
815 {0: ([], [(0,data)], 0)},
817 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
819 read_answer = read("si1", [0], [(0,10)])
820 self.failUnlessEqual(read_answer, {})
822 def test_allocate(self):
823 ss = self.create("test_allocate")
824 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
827 read = ss.remote_slot_readv
828 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
830 self.failUnlessEqual(read("si1", [], [(0, 10)]),
831 {0: [""], 1: [""], 2: [""]})
832 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
836 secrets = ( self.write_enabler("we1"),
837 self.renew_secret("we1"),
838 self.cancel_secret("we1") )
839 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
840 write = ss.remote_slot_testv_and_readv_and_writev
841 answer = write("si1", secrets,
842 {0: ([], [(0,data)], None)},
844 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
846 self.failUnlessEqual(read("si1", [0], [(0,20)]),
847 {0: ["00000000001111111111"]})
848 self.failUnlessEqual(read("si1", [0], [(95,10)]),
850 #self.failUnlessEqual(s0.remote_get_length(), 100)
852 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
853 f = self.failUnlessRaises(BadWriteEnablerError,
854 write, "si1", bad_secrets,
856 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
858 # this testv should fail
859 answer = write("si1", secrets,
860 {0: ([(0, 12, "eq", "444444444444"),
861 (20, 5, "eq", "22222"),
868 self.failUnlessEqual(answer, (False,
869 {0: ["000000000011", "22222"],
873 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
876 answer = write("si1", secrets,
877 {0: ([(10, 5, "lt", "11111"),
884 self.failUnlessEqual(answer, (False,
889 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
892 def test_operators(self):
893 # test operators, the data we're comparing is '11111' in all cases.
894 # test both fail+pass, reset data after each one.
895 ss = self.create("test_operators")
897 secrets = ( self.write_enabler("we1"),
898 self.renew_secret("we1"),
899 self.cancel_secret("we1") )
900 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
901 write = ss.remote_slot_testv_and_readv_and_writev
902 read = ss.remote_slot_readv
905 write("si1", secrets,
906 {0: ([], [(0,data)], None)},
912 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
917 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
918 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
919 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
922 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
927 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
928 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
931 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
936 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
937 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
941 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
946 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
947 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
950 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
955 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
956 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
959 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
964 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
965 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
969 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
974 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
978 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
983 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
984 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
988 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
993 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
994 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
997 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1002 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1003 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1007 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1012 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1013 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1016 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1021 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1022 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1025 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1030 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1031 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1035 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1040 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1041 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1044 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1049 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1050 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1053 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1058 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1059 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1062 # finally, test some operators against empty shares
1063 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1068 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1069 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1072 def test_readv(self):
1073 ss = self.create("test_readv")
1074 secrets = ( self.write_enabler("we1"),
1075 self.renew_secret("we1"),
1076 self.cancel_secret("we1") )
1077 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1078 write = ss.remote_slot_testv_and_readv_and_writev
1079 read = ss.remote_slot_readv
1080 data = [("%d" % i) * 100 for i in range(3)]
1081 rc = write("si1", secrets,
1082 {0: ([], [(0,data[0])], None),
1083 1: ([], [(0,data[1])], None),
1084 2: ([], [(0,data[2])], None),
1086 self.failUnlessEqual(rc, (True, {}))
1088 answer = read("si1", [], [(0, 10)])
1089 self.failUnlessEqual(answer, {0: ["0"*10],
1093 def compare_leases_without_timestamps(self, leases_a, leases_b):
1094 self.failUnlessEqual(len(leases_a), len(leases_b))
1095 for i in range(len(leases_a)):
1098 self.failUnlessEqual(a.owner_num, b.owner_num)
1099 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1100 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1101 self.failUnlessEqual(a.nodeid, b.nodeid)
1103 def compare_leases(self, leases_a, leases_b):
1104 self.failUnlessEqual(len(leases_a), len(leases_b))
1105 for i in range(len(leases_a)):
1108 self.failUnlessEqual(a.owner_num, b.owner_num)
1109 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1110 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1111 self.failUnlessEqual(a.nodeid, b.nodeid)
1112 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1114 def test_leases(self):
1115 ss = self.create("test_leases")
1117 return ( self.write_enabler("we1"),
1118 self.renew_secret("we1-%d" % n),
1119 self.cancel_secret("we1-%d" % n) )
1120 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1121 write = ss.remote_slot_testv_and_readv_and_writev
1122 read = ss.remote_slot_readv
1123 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1124 self.failUnlessEqual(rc, (True, {}))
1126 # create a random non-numeric file in the bucket directory, to
1127 # exercise the code that's supposed to ignore those.
1128 bucket_dir = os.path.join(self.workdir("test_leases"),
1129 "shares", storage_index_to_dir("si1"))
1130 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1131 f.write("you ought to be ignoring me\n")
1134 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1135 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1137 # add-lease on a missing storage index is silently ignored
1138 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1140 # re-allocate the slots and use the same secrets, that should update
1142 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1143 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1146 ss.remote_renew_lease("si1", secrets(0)[1])
1147 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1149 # now allocate them with a bunch of different secrets, to trigger the
1150 # extended lease code. Use add_lease for one of them.
1151 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1152 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1153 secrets2 = secrets(2)
1154 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1155 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1156 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1157 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1158 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1160 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1162 # cancel one of them
1163 ss.remote_cancel_lease("si1", secrets(5)[2])
1164 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1166 all_leases = list(s0.get_leases())
1167 # and write enough data to expand the container, forcing the server
1168 # to move the leases
1169 write("si1", secrets(0),
1170 {0: ([], [(0,data)], 200), },
1173 # read back the leases, make sure they're still intact.
1174 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1176 ss.remote_renew_lease("si1", secrets(0)[1])
1177 ss.remote_renew_lease("si1", secrets(1)[1])
1178 ss.remote_renew_lease("si1", secrets(2)[1])
1179 ss.remote_renew_lease("si1", secrets(3)[1])
1180 ss.remote_renew_lease("si1", secrets(4)[1])
1181 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1182 # get a new copy of the leases, with the current timestamps. Reading
1183 # data and failing to renew/cancel leases should leave the timestamps
1185 all_leases = list(s0.get_leases())
1186 # renewing with a bogus token should prompt an error message
1188 # examine the exception thus raised, make sure the old nodeid is
1189 # present, to provide for share migration
1190 e = self.failUnlessRaises(IndexError,
1191 ss.remote_renew_lease, "si1",
1194 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1195 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1196 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1198 # same for cancelling
1199 self.failUnlessRaises(IndexError,
1200 ss.remote_cancel_lease, "si1",
1202 self.compare_leases(all_leases, list(s0.get_leases()))
1204 # reading shares should not modify the timestamp
1205 read("si1", [], [(0,200)])
1206 self.compare_leases(all_leases, list(s0.get_leases()))
1208 write("si1", secrets(0),
1209 {0: ([], [(200, "make me bigger")], None)}, [])
1210 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1212 write("si1", secrets(0),
1213 {0: ([], [(500, "make me really bigger")], None)}, [])
1214 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1216 # now cancel them all
1217 ss.remote_cancel_lease("si1", secrets(0)[2])
1218 ss.remote_cancel_lease("si1", secrets(1)[2])
1219 ss.remote_cancel_lease("si1", secrets(2)[2])
1220 ss.remote_cancel_lease("si1", secrets(3)[2])
1222 # the slot should still be there
1223 remaining_shares = read("si1", [], [(0,10)])
1224 self.failUnlessEqual(len(remaining_shares), 1)
1225 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1227 # cancelling a non-existent lease should raise an IndexError
1228 self.failUnlessRaises(IndexError,
1229 ss.remote_cancel_lease, "si1", "nonsecret")
1231 # and the slot should still be there
1232 remaining_shares = read("si1", [], [(0,10)])
1233 self.failUnlessEqual(len(remaining_shares), 1)
1234 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1236 ss.remote_cancel_lease("si1", secrets(4)[2])
1237 # now the slot should be gone
1238 no_shares = read("si1", [], [(0,10)])
1239 self.failUnlessEqual(no_shares, {})
1241 # cancelling a lease on a non-existent share should raise an IndexError
1242 self.failUnlessRaises(IndexError,
1243 ss.remote_cancel_lease, "si2", "nonsecret")
1245 def test_remove(self):
1246 ss = self.create("test_remove")
1247 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1249 readv = ss.remote_slot_readv
1250 writev = ss.remote_slot_testv_and_readv_and_writev
1251 secrets = ( self.write_enabler("we1"),
1252 self.renew_secret("we1"),
1253 self.cancel_secret("we1") )
1254 # delete sh0 by setting its size to zero
1255 answer = writev("si1", secrets,
1258 # the answer should mention all the shares that existed before the
1260 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1261 # but a new read should show only sh1 and sh2
1262 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1265 # delete sh1 by setting its size to zero
1266 answer = writev("si1", secrets,
1269 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1270 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1273 # delete sh2 by setting its size to zero
1274 answer = writev("si1", secrets,
1277 self.failUnlessEqual(answer, (True, {2:[]}) )
1278 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1280 # and the bucket directory should now be gone
1281 si = base32.b2a("si1")
1282 # note: this is a detail of the storage server implementation, and
1283 # may change in the future
1285 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1286 bucketdir = os.path.join(prefixdir, si)
1287 self.failUnless(os.path.exists(prefixdir), prefixdir)
1288 self.failIf(os.path.exists(bucketdir), bucketdir)
1290 class Stats(unittest.TestCase):
1293 self.sparent = LoggingServiceParent()
1294 self._lease_secret = itertools.count()
1296 return self.sparent.stopService()
1298 def workdir(self, name):
1299 basedir = os.path.join("storage", "Server", name)
1302 def create(self, name):
1303 workdir = self.workdir(name)
1304 ss = StorageServer(workdir, "\x00" * 20)
1305 ss.setServiceParent(self.sparent)
1308 def test_latencies(self):
1309 ss = self.create("test_latencies")
1310 for i in range(10000):
1311 ss.add_latency("allocate", 1.0 * i)
1312 for i in range(1000):
1313 ss.add_latency("renew", 1.0 * i)
1315 ss.add_latency("write", 1.0 * i)
1317 ss.add_latency("cancel", 2.0 * i)
1318 ss.add_latency("get", 5.0)
1320 output = ss.get_latencies()
1322 self.failUnlessEqual(sorted(output.keys()),
1323 sorted(["allocate", "renew", "cancel", "write", "get"]))
1324 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1325 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1326 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1327 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1328 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1329 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1330 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1331 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1332 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1334 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1335 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1336 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
1337 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1338 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1339 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1340 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1341 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1342 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1344 self.failUnlessEqual(len(ss.latencies["write"]), 20)
1345 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
1346 self.failUnless(output["write"]["01_0_percentile"] is None, output)
1347 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
1348 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
1349 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
1350 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
1351 self.failUnless(output["write"]["99_0_percentile"] is None, output)
1352 self.failUnless(output["write"]["99_9_percentile"] is None, output)
1354 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1355 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1356 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
1357 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
1358 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1359 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1360 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
1361 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
1362 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
1364 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1365 self.failUnless(output["get"]["mean"] is None, output)
1366 self.failUnless(output["get"]["01_0_percentile"] is None, output)
1367 self.failUnless(output["get"]["10_0_percentile"] is None, output)
1368 self.failUnless(output["get"]["50_0_percentile"] is None, output)
1369 self.failUnless(output["get"]["90_0_percentile"] is None, output)
1370 self.failUnless(output["get"]["95_0_percentile"] is None, output)
1371 self.failUnless(output["get"]["99_0_percentile"] is None, output)
1372 self.failUnless(output["get"]["99_9_percentile"] is None, output)
1375 s = re.sub(r'<[^>]*>', ' ', s)
1376 s = re.sub(r'\s+', ' ', s)
1379 class MyBucketCountingCrawler(BucketCountingCrawler):
1380 def finished_prefix(self, cycle, prefix):
1381 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1383 d = self.hook_ds.pop(0)
1386 class MyStorageServer(StorageServer):
1387 def add_bucket_counter(self):
1388 statefile = os.path.join(self.storedir, "bucket_counter.state")
1389 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1390 self.bucket_counter.setServiceParent(self)
1392 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1395 self.s = service.MultiService()
1396 self.s.startService()
1398 return self.s.stopService()
1400 def test_bucket_counter(self):
1401 basedir = "storage/BucketCounter/bucket_counter"
1402 fileutil.make_dirs(basedir)
1403 ss = StorageServer(basedir, "\x00" * 20)
1404 # to make sure we capture the bucket-counting-crawler in the middle
1405 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1406 # also make it start sooner than usual.
1407 ss.bucket_counter.slow_start = 0
1408 orig_cpu_slice = ss.bucket_counter.cpu_slice
1409 ss.bucket_counter.cpu_slice = 0
1410 ss.setServiceParent(self.s)
1412 w = StorageStatus(ss)
1414 # this sample is before the crawler has started doing anything
1415 html = w.renderSynchronously()
1416 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1417 s = remove_tags(html)
1418 self.failUnlessIn("Accepting new shares: Yes", s)
1419 self.failUnlessIn("Reserved space: - 0 B (0)", s)
1420 self.failUnlessIn("Total buckets: Not computed yet", s)
1421 self.failUnlessIn("Next crawl in", s)
1423 # give the bucket-counting-crawler one tick to get started. The
1424 # cpu_slice=0 will force it to yield right after it processes the
1427 d = fireEventually()
1428 def _check(ignored):
1429 # are we really right after the first prefix?
1430 state = ss.bucket_counter.get_state()
1431 if state["last-complete-prefix"] is None:
1432 d2 = fireEventually()
1433 d2.addCallback(_check)
1435 self.failUnlessEqual(state["last-complete-prefix"],
1436 ss.bucket_counter.prefixes[0])
1437 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1438 html = w.renderSynchronously()
1439 s = remove_tags(html)
1440 self.failUnlessIn(" Current crawl ", s)
1441 self.failUnlessIn(" (next work in ", s)
1442 d.addCallback(_check)
1444 # now give it enough time to complete a full cycle
1446 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1447 d.addCallback(lambda ignored: self.poll(_watch))
1448 def _check2(ignored):
1449 ss.bucket_counter.cpu_slice = orig_cpu_slice
1450 html = w.renderSynchronously()
1451 s = remove_tags(html)
1452 self.failUnlessIn("Total buckets: 0 (the number of", s)
1453 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1454 d.addCallback(_check2)
1457 def test_bucket_counter_cleanup(self):
1458 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1459 fileutil.make_dirs(basedir)
1460 ss = StorageServer(basedir, "\x00" * 20)
1461 # to make sure we capture the bucket-counting-crawler in the middle
1462 # of a cycle, we reach in and reduce its maximum slice time to 0.
1463 ss.bucket_counter.slow_start = 0
1464 orig_cpu_slice = ss.bucket_counter.cpu_slice
1465 ss.bucket_counter.cpu_slice = 0
1466 ss.setServiceParent(self.s)
1468 d = fireEventually()
1470 def _after_first_prefix(ignored):
1471 state = ss.bucket_counter.state
1472 if state["last-complete-prefix"] is None:
1473 d2 = fireEventually()
1474 d2.addCallback(_after_first_prefix)
1476 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1477 # now sneak in and mess with its state, to make sure it cleans up
1478 # properly at the end of the cycle
1479 self.failUnlessEqual(state["last-complete-prefix"],
1480 ss.bucket_counter.prefixes[0])
1481 state["bucket-counts"][-12] = {}
1482 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1483 ss.bucket_counter.save_state()
1484 d.addCallback(_after_first_prefix)
1486 # now give it enough time to complete a cycle
1488 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1489 d.addCallback(lambda ignored: self.poll(_watch))
1490 def _check2(ignored):
1491 ss.bucket_counter.cpu_slice = orig_cpu_slice
1492 s = ss.bucket_counter.get_state()
1493 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1494 self.failIf("bogusprefix!" in s["storage-index-samples"],
1495 s["storage-index-samples"].keys())
1496 d.addCallback(_check2)
1499 def test_bucket_counter_eta(self):
1500 basedir = "storage/BucketCounter/bucket_counter_eta"
1501 fileutil.make_dirs(basedir)
1502 ss = MyStorageServer(basedir, "\x00" * 20)
1503 ss.bucket_counter.slow_start = 0
1504 # these will be fired inside finished_prefix()
1505 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1506 w = StorageStatus(ss)
1508 d = defer.Deferred()
1510 def _check_1(ignored):
1511 # no ETA is available yet
1512 html = w.renderSynchronously()
1513 s = remove_tags(html)
1514 self.failUnlessIn("complete (next work", s)
1516 def _check_2(ignored):
1517 # one prefix has finished, so an ETA based upon that elapsed time
1518 # should be available.
1519 html = w.renderSynchronously()
1520 s = remove_tags(html)
1521 self.failUnlessIn("complete (ETA ", s)
1523 def _check_3(ignored):
1524 # two prefixes have finished
1525 html = w.renderSynchronously()
1526 s = remove_tags(html)
1527 self.failUnlessIn("complete (ETA ", s)
1530 hooks[0].addCallback(_check_1).addErrback(d.errback)
1531 hooks[1].addCallback(_check_2).addErrback(d.errback)
1532 hooks[2].addCallback(_check_3).addErrback(d.errback)
1534 ss.setServiceParent(self.s)
1537 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1538 stop_after_first_bucket = False
1539 def process_bucket(self, *args, **kwargs):
1540 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1541 if self.stop_after_first_bucket:
1542 self.stop_after_first_bucket = False
1543 self.cpu_slice = -1.0
1544 def yielding(self, sleep_time):
1545 if not self.stop_after_first_bucket:
1546 self.cpu_slice = 500
1548 class BrokenStatResults:
1550 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1553 bsr = BrokenStatResults()
1554 for attrname in dir(s):
1555 if attrname.startswith("_"):
1557 if attrname == "st_blocks":
1559 setattr(bsr, attrname, getattr(s, attrname))
1562 class InstrumentedStorageServer(StorageServer):
1563 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1564 class No_ST_BLOCKS_StorageServer(StorageServer):
1565 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1567 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1570 self.s = service.MultiService()
1571 self.s.startService()
1573 return self.s.stopService()
1575 def make_shares(self, ss):
1577 return (si, hashutil.tagged_hash("renew", si),
1578 hashutil.tagged_hash("cancel", si))
1579 def make_mutable(si):
1580 return (si, hashutil.tagged_hash("renew", si),
1581 hashutil.tagged_hash("cancel", si),
1582 hashutil.tagged_hash("write-enabler", si))
1583 def make_extra_lease(si, num):
1584 return (hashutil.tagged_hash("renew-%d" % num, si),
1585 hashutil.tagged_hash("cancel-%d" % num, si))
1587 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1588 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1589 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1590 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1591 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1592 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1594 canary = FakeCanary()
1595 # note: 'tahoe debug dump-share' will not handle this file, since the
1596 # inner contents are not a valid CHK share
1597 data = "\xff" * 1000
1599 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1601 w[0].remote_write(0, data)
1604 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1606 w[0].remote_write(0, data)
1608 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1610 writev = ss.remote_slot_testv_and_readv_and_writev
1611 writev(mutable_si_2, (we2, rs2, cs2),
1612 {0: ([], [(0,data)], len(data))}, [])
1613 writev(mutable_si_3, (we3, rs3, cs3),
1614 {0: ([], [(0,data)], len(data))}, [])
1615 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1617 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1618 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1619 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1621 def test_basic(self):
1622 basedir = "storage/LeaseCrawler/basic"
1623 fileutil.make_dirs(basedir)
1624 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1625 # make it start sooner than usual.
1626 lc = ss.lease_checker
1629 lc.stop_after_first_bucket = True
1630 webstatus = StorageStatus(ss)
1632 # create a few shares, with some leases on them
1633 self.make_shares(ss)
1634 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1636 # add a non-sharefile to exercise another code path
1637 fn = os.path.join(ss.sharedir,
1638 storage_index_to_dir(immutable_si_0),
1641 f.write("I am not a share.\n")
1644 # this is before the crawl has started, so we're not in a cycle yet
1645 initial_state = lc.get_state()
1646 self.failIf(lc.get_progress()["cycle-in-progress"])
1647 self.failIfIn("cycle-to-date", initial_state)
1648 self.failIfIn("estimated-remaining-cycle", initial_state)
1649 self.failIfIn("estimated-current-cycle", initial_state)
1650 self.failUnlessIn("history", initial_state)
1651 self.failUnlessEqual(initial_state["history"], {})
1653 ss.setServiceParent(self.s)
1657 d = fireEventually()
1659 # now examine the state right after the first bucket has been
1661 def _after_first_bucket(ignored):
1662 initial_state = lc.get_state()
1663 if "cycle-to-date" not in initial_state:
1664 d2 = fireEventually()
1665 d2.addCallback(_after_first_bucket)
1667 self.failUnlessIn("cycle-to-date", initial_state)
1668 self.failUnlessIn("estimated-remaining-cycle", initial_state)
1669 self.failUnlessIn("estimated-current-cycle", initial_state)
1670 self.failUnlessIn("history", initial_state)
1671 self.failUnlessEqual(initial_state["history"], {})
1673 so_far = initial_state["cycle-to-date"]
1674 self.failUnlessEqual(so_far["expiration-enabled"], False)
1675 self.failUnlessIn("configured-expiration-mode", so_far)
1676 self.failUnlessIn("lease-age-histogram", so_far)
1677 lah = so_far["lease-age-histogram"]
1678 self.failUnlessEqual(type(lah), list)
1679 self.failUnlessEqual(len(lah), 1)
1680 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1681 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1682 self.failUnlessEqual(so_far["corrupt-shares"], [])
1683 sr1 = so_far["space-recovered"]
1684 self.failUnlessEqual(sr1["examined-buckets"], 1)
1685 self.failUnlessEqual(sr1["examined-shares"], 1)
1686 self.failUnlessEqual(sr1["actual-shares"], 0)
1687 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1688 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1689 left = initial_state["estimated-remaining-cycle"]
1690 sr2 = left["space-recovered"]
1691 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1692 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1693 self.failIfEqual(sr2["actual-shares"], None)
1694 self.failIfEqual(sr2["configured-diskbytes"], None)
1695 self.failIfEqual(sr2["original-sharebytes"], None)
1696 d.addCallback(_after_first_bucket)
1697 d.addCallback(lambda ign: self.render1(webstatus))
1698 def _check_html_in_cycle(html):
1699 s = remove_tags(html)
1700 self.failUnlessIn("So far, this cycle has examined "
1701 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1702 self.failUnlessIn("and has recovered: "
1703 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1704 "0 B (0 B / 0 B)", s)
1705 self.failUnlessIn("If expiration were enabled, "
1706 "we would have recovered: "
1707 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1708 " 0 B (0 B / 0 B) by now", s)
1709 self.failUnlessIn("and the remainder of this cycle "
1710 "would probably recover: "
1711 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1712 " 0 B (0 B / 0 B)", s)
1713 self.failUnlessIn("and the whole cycle would probably recover: "
1714 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1715 " 0 B (0 B / 0 B)", s)
1716 self.failUnlessIn("if we were strictly using each lease's default "
1717 "31-day lease lifetime", s)
1718 self.failUnlessIn("this cycle would be expected to recover: ", s)
1719 d.addCallback(_check_html_in_cycle)
1721 # wait for the crawler to finish the first cycle. Nothing should have
1724 return bool(lc.get_state()["last-cycle-finished"] is not None)
1725 d.addCallback(lambda ign: self.poll(_wait))
1727 def _after_first_cycle(ignored):
1729 self.failIf("cycle-to-date" in s)
1730 self.failIf("estimated-remaining-cycle" in s)
1731 self.failIf("estimated-current-cycle" in s)
1732 last = s["history"][0]
1733 self.failUnlessIn("cycle-start-finish-times", last)
1734 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1735 self.failUnlessEqual(last["expiration-enabled"], False)
1736 self.failUnlessIn("configured-expiration-mode", last)
1738 self.failUnlessIn("lease-age-histogram", last)
1739 lah = last["lease-age-histogram"]
1740 self.failUnlessEqual(type(lah), list)
1741 self.failUnlessEqual(len(lah), 1)
1742 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1744 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1745 self.failUnlessEqual(last["corrupt-shares"], [])
1747 rec = last["space-recovered"]
1748 self.failUnlessEqual(rec["examined-buckets"], 4)
1749 self.failUnlessEqual(rec["examined-shares"], 4)
1750 self.failUnlessEqual(rec["actual-buckets"], 0)
1751 self.failUnlessEqual(rec["original-buckets"], 0)
1752 self.failUnlessEqual(rec["configured-buckets"], 0)
1753 self.failUnlessEqual(rec["actual-shares"], 0)
1754 self.failUnlessEqual(rec["original-shares"], 0)
1755 self.failUnlessEqual(rec["configured-shares"], 0)
1756 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1757 self.failUnlessEqual(rec["original-diskbytes"], 0)
1758 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1759 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1760 self.failUnlessEqual(rec["original-sharebytes"], 0)
1761 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1763 def _get_sharefile(si):
1764 return list(ss._iter_share_files(si))[0]
1765 def count_leases(si):
1766 return len(list(_get_sharefile(si).get_leases()))
1767 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1768 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1769 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1770 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1771 d.addCallback(_after_first_cycle)
1772 d.addCallback(lambda ign: self.render1(webstatus))
1773 def _check_html(html):
1774 s = remove_tags(html)
1775 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1776 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1777 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1778 "(2 mutable / 2 immutable),", s)
1779 self.failUnlessIn("but expiration was not enabled", s)
1780 d.addCallback(_check_html)
1781 d.addCallback(lambda ign: self.render_json(webstatus))
1782 def _check_json(json):
1783 data = simplejson.loads(json)
1784 self.failUnlessIn("lease-checker", data)
1785 self.failUnlessIn("lease-checker-progress", data)
1786 d.addCallback(_check_json)
1789 def backdate_lease(self, sf, renew_secret, new_expire_time):
1790 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1791 # "renew" a lease with a new_expire_time that is older than what the
1792 # current lease has), so we have to reach inside it.
1793 for i,lease in enumerate(sf.get_leases()):
1794 if lease.renew_secret == renew_secret:
1795 lease.expiration_time = new_expire_time
1796 f = open(sf.home, 'rb+')
1797 sf._write_lease_record(f, i, lease)
1800 raise IndexError("unable to renew non-existent lease")
1802 def test_expire_age(self):
1803 basedir = "storage/LeaseCrawler/expire_age"
1804 fileutil.make_dirs(basedir)
1805 # setting expiration_time to 2000 means that any lease which is more
1806 # than 2000s old will be expired.
1807 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1808 expiration_enabled=True,
1809 expiration_mode="age",
1810 expiration_override_lease_duration=2000)
1811 # make it start sooner than usual.
1812 lc = ss.lease_checker
1814 lc.stop_after_first_bucket = True
1815 webstatus = StorageStatus(ss)
1817 # create a few shares, with some leases on them
1818 self.make_shares(ss)
1819 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1821 def count_shares(si):
1822 return len(list(ss._iter_share_files(si)))
1823 def _get_sharefile(si):
1824 return list(ss._iter_share_files(si))[0]
1825 def count_leases(si):
1826 return len(list(_get_sharefile(si).get_leases()))
1828 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1829 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1830 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1831 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1832 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1833 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1834 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1835 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1837 # artificially crank back the expiration time on the first lease of
1838 # each share, to make it look like it expired already (age=1000s).
1839 # Some shares have an extra lease which is set to expire at the
1840 # default time in 31 days from now (age=31days). We then run the
1841 # crawler, which will expire the first lease, making some shares get
1842 # deleted and others stay alive (with one remaining lease)
1845 sf0 = _get_sharefile(immutable_si_0)
1846 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1847 sf0_size = os.stat(sf0.home).st_size
1849 # immutable_si_1 gets an extra lease
1850 sf1 = _get_sharefile(immutable_si_1)
1851 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1853 sf2 = _get_sharefile(mutable_si_2)
1854 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1855 sf2_size = os.stat(sf2.home).st_size
1857 # mutable_si_3 gets an extra lease
1858 sf3 = _get_sharefile(mutable_si_3)
1859 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1861 ss.setServiceParent(self.s)
1863 d = fireEventually()
1864 # examine the state right after the first bucket has been processed
1865 def _after_first_bucket(ignored):
1866 p = lc.get_progress()
1867 if not p["cycle-in-progress"]:
1868 d2 = fireEventually()
1869 d2.addCallback(_after_first_bucket)
1871 d.addCallback(_after_first_bucket)
1872 d.addCallback(lambda ign: self.render1(webstatus))
1873 def _check_html_in_cycle(html):
1874 s = remove_tags(html)
1875 # the first bucket encountered gets deleted, and its prefix
1876 # happens to be about 1/5th of the way through the ring, so the
1877 # predictor thinks we'll have 5 shares and that we'll delete them
1878 # all. This part of the test depends upon the SIs landing right
1879 # where they do now.
1880 self.failUnlessIn("The remainder of this cycle is expected to "
1881 "recover: 4 shares, 4 buckets", s)
1882 self.failUnlessIn("The whole cycle is expected to examine "
1883 "5 shares in 5 buckets and to recover: "
1884 "5 shares, 5 buckets", s)
1885 d.addCallback(_check_html_in_cycle)
1887 # wait for the crawler to finish the first cycle. Two shares should
1890 return bool(lc.get_state()["last-cycle-finished"] is not None)
1891 d.addCallback(lambda ign: self.poll(_wait))
1893 def _after_first_cycle(ignored):
1894 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1895 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1896 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1897 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1898 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1899 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1902 last = s["history"][0]
1904 self.failUnlessEqual(last["expiration-enabled"], True)
1905 self.failUnlessEqual(last["configured-expiration-mode"],
1906 ("age", 2000, None, ("mutable", "immutable")))
1907 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1909 rec = last["space-recovered"]
1910 self.failUnlessEqual(rec["examined-buckets"], 4)
1911 self.failUnlessEqual(rec["examined-shares"], 4)
1912 self.failUnlessEqual(rec["actual-buckets"], 2)
1913 self.failUnlessEqual(rec["original-buckets"], 2)
1914 self.failUnlessEqual(rec["configured-buckets"], 2)
1915 self.failUnlessEqual(rec["actual-shares"], 2)
1916 self.failUnlessEqual(rec["original-shares"], 2)
1917 self.failUnlessEqual(rec["configured-shares"], 2)
1918 size = sf0_size + sf2_size
1919 self.failUnlessEqual(rec["actual-sharebytes"], size)
1920 self.failUnlessEqual(rec["original-sharebytes"], size)
1921 self.failUnlessEqual(rec["configured-sharebytes"], size)
1922 # different platforms have different notions of "blocks used by
1923 # this file", so merely assert that it's a number
1924 self.failUnless(rec["actual-diskbytes"] >= 0,
1925 rec["actual-diskbytes"])
1926 self.failUnless(rec["original-diskbytes"] >= 0,
1927 rec["original-diskbytes"])
1928 self.failUnless(rec["configured-diskbytes"] >= 0,
1929 rec["configured-diskbytes"])
1930 d.addCallback(_after_first_cycle)
1931 d.addCallback(lambda ign: self.render1(webstatus))
1932 def _check_html(html):
1933 s = remove_tags(html)
1934 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1935 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1936 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1937 d.addCallback(_check_html)
1940 def test_expire_cutoff_date(self):
1941 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1942 fileutil.make_dirs(basedir)
1943 # setting cutoff-date to 2000 seconds ago means that any lease which
1944 # is more than 2000s old will be expired.
1946 then = int(now - 2000)
1947 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1948 expiration_enabled=True,
1949 expiration_mode="cutoff-date",
1950 expiration_cutoff_date=then)
1951 # make it start sooner than usual.
1952 lc = ss.lease_checker
1954 lc.stop_after_first_bucket = True
1955 webstatus = StorageStatus(ss)
1957 # create a few shares, with some leases on them
1958 self.make_shares(ss)
1959 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1961 def count_shares(si):
1962 return len(list(ss._iter_share_files(si)))
1963 def _get_sharefile(si):
1964 return list(ss._iter_share_files(si))[0]
1965 def count_leases(si):
1966 return len(list(_get_sharefile(si).get_leases()))
1968 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1969 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1970 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1971 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1972 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1973 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1974 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1975 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1977 # artificially crank back the expiration time on the first lease of
1978 # each share, to make it look like was renewed 3000s ago. To achieve
1979 # this, we need to set the expiration time to now-3000+31days. This
1980 # will change when the lease format is improved to contain both
1981 # create/renew time and duration.
1982 new_expiration_time = now - 3000 + 31*24*60*60
1984 # Some shares have an extra lease which is set to expire at the
1985 # default time in 31 days from now (age=31days). We then run the
1986 # crawler, which will expire the first lease, making some shares get
1987 # deleted and others stay alive (with one remaining lease)
1989 sf0 = _get_sharefile(immutable_si_0)
1990 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1991 sf0_size = os.stat(sf0.home).st_size
1993 # immutable_si_1 gets an extra lease
1994 sf1 = _get_sharefile(immutable_si_1)
1995 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1997 sf2 = _get_sharefile(mutable_si_2)
1998 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1999 sf2_size = os.stat(sf2.home).st_size
2001 # mutable_si_3 gets an extra lease
2002 sf3 = _get_sharefile(mutable_si_3)
2003 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2005 ss.setServiceParent(self.s)
2007 d = fireEventually()
2008 # examine the state right after the first bucket has been processed
2009 def _after_first_bucket(ignored):
2010 p = lc.get_progress()
2011 if not p["cycle-in-progress"]:
2012 d2 = fireEventually()
2013 d2.addCallback(_after_first_bucket)
2015 d.addCallback(_after_first_bucket)
2016 d.addCallback(lambda ign: self.render1(webstatus))
2017 def _check_html_in_cycle(html):
2018 s = remove_tags(html)
2019 # the first bucket encountered gets deleted, and its prefix
2020 # happens to be about 1/5th of the way through the ring, so the
2021 # predictor thinks we'll have 5 shares and that we'll delete them
2022 # all. This part of the test depends upon the SIs landing right
2023 # where they do now.
2024 self.failUnlessIn("The remainder of this cycle is expected to "
2025 "recover: 4 shares, 4 buckets", s)
2026 self.failUnlessIn("The whole cycle is expected to examine "
2027 "5 shares in 5 buckets and to recover: "
2028 "5 shares, 5 buckets", s)
2029 d.addCallback(_check_html_in_cycle)
2031 # wait for the crawler to finish the first cycle. Two shares should
2034 return bool(lc.get_state()["last-cycle-finished"] is not None)
2035 d.addCallback(lambda ign: self.poll(_wait))
2037 def _after_first_cycle(ignored):
2038 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2039 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2040 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2041 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2042 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2043 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2046 last = s["history"][0]
2048 self.failUnlessEqual(last["expiration-enabled"], True)
2049 self.failUnlessEqual(last["configured-expiration-mode"],
2050 ("cutoff-date", None, then,
2051 ("mutable", "immutable")))
2052 self.failUnlessEqual(last["leases-per-share-histogram"],
2055 rec = last["space-recovered"]
2056 self.failUnlessEqual(rec["examined-buckets"], 4)
2057 self.failUnlessEqual(rec["examined-shares"], 4)
2058 self.failUnlessEqual(rec["actual-buckets"], 2)
2059 self.failUnlessEqual(rec["original-buckets"], 0)
2060 self.failUnlessEqual(rec["configured-buckets"], 2)
2061 self.failUnlessEqual(rec["actual-shares"], 2)
2062 self.failUnlessEqual(rec["original-shares"], 0)
2063 self.failUnlessEqual(rec["configured-shares"], 2)
2064 size = sf0_size + sf2_size
2065 self.failUnlessEqual(rec["actual-sharebytes"], size)
2066 self.failUnlessEqual(rec["original-sharebytes"], 0)
2067 self.failUnlessEqual(rec["configured-sharebytes"], size)
2068 # different platforms have different notions of "blocks used by
2069 # this file", so merely assert that it's a number
2070 self.failUnless(rec["actual-diskbytes"] >= 0,
2071 rec["actual-diskbytes"])
2072 self.failUnless(rec["original-diskbytes"] >= 0,
2073 rec["original-diskbytes"])
2074 self.failUnless(rec["configured-diskbytes"] >= 0,
2075 rec["configured-diskbytes"])
2076 d.addCallback(_after_first_cycle)
2077 d.addCallback(lambda ign: self.render1(webstatus))
2078 def _check_html(html):
2079 s = remove_tags(html)
2080 self.failUnlessIn("Expiration Enabled:"
2081 " expired leases will be removed", s)
2082 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2083 substr = "Leases created or last renewed before %s will be considered expired." % date
2084 self.failUnlessIn(substr, s)
2085 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2086 d.addCallback(_check_html)
2089 def test_only_immutable(self):
2090 basedir = "storage/LeaseCrawler/only_immutable"
2091 fileutil.make_dirs(basedir)
2093 then = int(now - 2000)
2094 ss = StorageServer(basedir, "\x00" * 20,
2095 expiration_enabled=True,
2096 expiration_mode="cutoff-date",
2097 expiration_cutoff_date=then,
2098 expiration_sharetypes=("immutable",))
2099 lc = ss.lease_checker
2101 webstatus = StorageStatus(ss)
2103 self.make_shares(ss)
2104 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2105 # set all leases to be expirable
2106 new_expiration_time = now - 3000 + 31*24*60*60
2108 def count_shares(si):
2109 return len(list(ss._iter_share_files(si)))
2110 def _get_sharefile(si):
2111 return list(ss._iter_share_files(si))[0]
2112 def count_leases(si):
2113 return len(list(_get_sharefile(si).get_leases()))
2115 sf0 = _get_sharefile(immutable_si_0)
2116 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2117 sf1 = _get_sharefile(immutable_si_1)
2118 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2119 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2120 sf2 = _get_sharefile(mutable_si_2)
2121 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2122 sf3 = _get_sharefile(mutable_si_3)
2123 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2124 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2126 ss.setServiceParent(self.s)
2128 return bool(lc.get_state()["last-cycle-finished"] is not None)
2129 d = self.poll(_wait)
2131 def _after_first_cycle(ignored):
2132 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2133 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2134 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2135 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2136 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2137 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2138 d.addCallback(_after_first_cycle)
2139 d.addCallback(lambda ign: self.render1(webstatus))
2140 def _check_html(html):
2141 s = remove_tags(html)
2142 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2143 d.addCallback(_check_html)
2146 def test_only_mutable(self):
2147 basedir = "storage/LeaseCrawler/only_mutable"
2148 fileutil.make_dirs(basedir)
2150 then = int(now - 2000)
2151 ss = StorageServer(basedir, "\x00" * 20,
2152 expiration_enabled=True,
2153 expiration_mode="cutoff-date",
2154 expiration_cutoff_date=then,
2155 expiration_sharetypes=("mutable",))
2156 lc = ss.lease_checker
2158 webstatus = StorageStatus(ss)
2160 self.make_shares(ss)
2161 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2162 # set all leases to be expirable
2163 new_expiration_time = now - 3000 + 31*24*60*60
2165 def count_shares(si):
2166 return len(list(ss._iter_share_files(si)))
2167 def _get_sharefile(si):
2168 return list(ss._iter_share_files(si))[0]
2169 def count_leases(si):
2170 return len(list(_get_sharefile(si).get_leases()))
2172 sf0 = _get_sharefile(immutable_si_0)
2173 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2174 sf1 = _get_sharefile(immutable_si_1)
2175 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2176 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2177 sf2 = _get_sharefile(mutable_si_2)
2178 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2179 sf3 = _get_sharefile(mutable_si_3)
2180 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2181 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2183 ss.setServiceParent(self.s)
2185 return bool(lc.get_state()["last-cycle-finished"] is not None)
2186 d = self.poll(_wait)
2188 def _after_first_cycle(ignored):
2189 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2190 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2191 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2192 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2193 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2194 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2195 d.addCallback(_after_first_cycle)
2196 d.addCallback(lambda ign: self.render1(webstatus))
2197 def _check_html(html):
2198 s = remove_tags(html)
2199 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2200 d.addCallback(_check_html)
2203 def test_bad_mode(self):
2204 basedir = "storage/LeaseCrawler/bad_mode"
2205 fileutil.make_dirs(basedir)
2206 e = self.failUnlessRaises(ValueError,
2207 StorageServer, basedir, "\x00" * 20,
2208 expiration_mode="bogus")
2209 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2211 def test_parse_duration(self):
2215 p = time_format.parse_duration
2216 self.failUnlessEqual(p("7days"), 7*DAY)
2217 self.failUnlessEqual(p("31day"), 31*DAY)
2218 self.failUnlessEqual(p("60 days"), 60*DAY)
2219 self.failUnlessEqual(p("2mo"), 2*MONTH)
2220 self.failUnlessEqual(p("3 month"), 3*MONTH)
2221 self.failUnlessEqual(p("2years"), 2*YEAR)
2222 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2223 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2225 def test_parse_date(self):
2226 p = time_format.parse_date
2227 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2228 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2230 def test_limited_history(self):
2231 basedir = "storage/LeaseCrawler/limited_history"
2232 fileutil.make_dirs(basedir)
2233 ss = StorageServer(basedir, "\x00" * 20)
2234 # make it start sooner than usual.
2235 lc = ss.lease_checker
2239 # create a few shares, with some leases on them
2240 self.make_shares(ss)
2242 ss.setServiceParent(self.s)
2244 def _wait_until_15_cycles_done():
2245 last = lc.state["last-cycle-finished"]
2246 if last is not None and last >= 15:
2251 d = self.poll(_wait_until_15_cycles_done)
2253 def _check(ignored):
2256 self.failUnlessEqual(len(h), 10)
2257 self.failUnlessEqual(max(h.keys()), 15)
2258 self.failUnlessEqual(min(h.keys()), 6)
2259 d.addCallback(_check)
2262 def test_unpredictable_future(self):
2263 basedir = "storage/LeaseCrawler/unpredictable_future"
2264 fileutil.make_dirs(basedir)
2265 ss = StorageServer(basedir, "\x00" * 20)
2266 # make it start sooner than usual.
2267 lc = ss.lease_checker
2269 lc.cpu_slice = -1.0 # stop quickly
2271 self.make_shares(ss)
2273 ss.setServiceParent(self.s)
2275 d = fireEventually()
2276 def _check(ignored):
2277 # this should fire after the first bucket is complete, but before
2278 # the first prefix is complete, so the progress-measurer won't
2279 # think we've gotten far enough to raise our percent-complete
2280 # above 0%, triggering the cannot-predict-the-future code in
2281 # expirer.py . This will have to change if/when the
2282 # progress-measurer gets smart enough to count buckets (we'll
2283 # have to interrupt it even earlier, before it's finished the
2286 if "cycle-to-date" not in s:
2287 d2 = fireEventually()
2288 d2.addCallback(_check)
2290 self.failUnlessIn("cycle-to-date", s)
2291 self.failUnlessIn("estimated-remaining-cycle", s)
2292 self.failUnlessIn("estimated-current-cycle", s)
2294 left = s["estimated-remaining-cycle"]["space-recovered"]
2295 self.failUnlessEqual(left["actual-buckets"], None)
2296 self.failUnlessEqual(left["original-buckets"], None)
2297 self.failUnlessEqual(left["configured-buckets"], None)
2298 self.failUnlessEqual(left["actual-shares"], None)
2299 self.failUnlessEqual(left["original-shares"], None)
2300 self.failUnlessEqual(left["configured-shares"], None)
2301 self.failUnlessEqual(left["actual-diskbytes"], None)
2302 self.failUnlessEqual(left["original-diskbytes"], None)
2303 self.failUnlessEqual(left["configured-diskbytes"], None)
2304 self.failUnlessEqual(left["actual-sharebytes"], None)
2305 self.failUnlessEqual(left["original-sharebytes"], None)
2306 self.failUnlessEqual(left["configured-sharebytes"], None)
2308 full = s["estimated-remaining-cycle"]["space-recovered"]
2309 self.failUnlessEqual(full["actual-buckets"], None)
2310 self.failUnlessEqual(full["original-buckets"], None)
2311 self.failUnlessEqual(full["configured-buckets"], None)
2312 self.failUnlessEqual(full["actual-shares"], None)
2313 self.failUnlessEqual(full["original-shares"], None)
2314 self.failUnlessEqual(full["configured-shares"], None)
2315 self.failUnlessEqual(full["actual-diskbytes"], None)
2316 self.failUnlessEqual(full["original-diskbytes"], None)
2317 self.failUnlessEqual(full["configured-diskbytes"], None)
2318 self.failUnlessEqual(full["actual-sharebytes"], None)
2319 self.failUnlessEqual(full["original-sharebytes"], None)
2320 self.failUnlessEqual(full["configured-sharebytes"], None)
2322 d.addCallback(_check)
2325 def test_no_st_blocks(self):
2326 basedir = "storage/LeaseCrawler/no_st_blocks"
2327 fileutil.make_dirs(basedir)
2328 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2329 expiration_mode="age",
2330 expiration_override_lease_duration=-1000)
2331 # a negative expiration_time= means the "configured-"
2332 # space-recovered counts will be non-zero, since all shares will have
2335 # make it start sooner than usual.
2336 lc = ss.lease_checker
2339 self.make_shares(ss)
2340 ss.setServiceParent(self.s)
2342 return bool(lc.get_state()["last-cycle-finished"] is not None)
2343 d = self.poll(_wait)
2345 def _check(ignored):
2347 last = s["history"][0]
2348 rec = last["space-recovered"]
2349 self.failUnlessEqual(rec["configured-buckets"], 4)
2350 self.failUnlessEqual(rec["configured-shares"], 4)
2351 self.failUnless(rec["configured-sharebytes"] > 0,
2352 rec["configured-sharebytes"])
2353 # without the .st_blocks field in os.stat() results, we should be
2354 # reporting diskbytes==sharebytes
2355 self.failUnlessEqual(rec["configured-sharebytes"],
2356 rec["configured-diskbytes"])
2357 d.addCallback(_check)
2360 def test_share_corruption(self):
2361 self._poll_should_ignore_these_errors = [
2362 UnknownMutableContainerVersionError,
2363 UnknownImmutableContainerVersionError,
2365 basedir = "storage/LeaseCrawler/share_corruption"
2366 fileutil.make_dirs(basedir)
2367 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2368 w = StorageStatus(ss)
2369 # make it start sooner than usual.
2370 lc = ss.lease_checker
2371 lc.stop_after_first_bucket = True
2375 # create a few shares, with some leases on them
2376 self.make_shares(ss)
2378 # now corrupt one, and make sure the lease-checker keeps going
2379 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2380 first = min(self.sis)
2381 first_b32 = base32.b2a(first)
2382 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2385 f.write("BAD MAGIC")
2387 # if get_share_file() doesn't see the correct mutable magic, it
2388 # assumes the file is an immutable share, and then
2389 # immutable.ShareFile sees a bad version. So regardless of which kind
2390 # of share we corrupted, this will trigger an
2391 # UnknownImmutableContainerVersionError.
2393 # also create an empty bucket
2394 empty_si = base32.b2a("\x04"*16)
2395 empty_bucket_dir = os.path.join(ss.sharedir,
2396 storage_index_to_dir(empty_si))
2397 fileutil.make_dirs(empty_bucket_dir)
2399 ss.setServiceParent(self.s)
2401 d = fireEventually()
2403 # now examine the state right after the first bucket has been
2405 def _after_first_bucket(ignored):
2407 if "cycle-to-date" not in s:
2408 d2 = fireEventually()
2409 d2.addCallback(_after_first_bucket)
2411 so_far = s["cycle-to-date"]
2412 rec = so_far["space-recovered"]
2413 self.failUnlessEqual(rec["examined-buckets"], 1)
2414 self.failUnlessEqual(rec["examined-shares"], 0)
2415 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2416 d.addCallback(_after_first_bucket)
2418 d.addCallback(lambda ign: self.render_json(w))
2419 def _check_json(json):
2420 data = simplejson.loads(json)
2421 # grr. json turns all dict keys into strings.
2422 so_far = data["lease-checker"]["cycle-to-date"]
2423 corrupt_shares = so_far["corrupt-shares"]
2424 # it also turns all tuples into lists
2425 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2426 d.addCallback(_check_json)
2427 d.addCallback(lambda ign: self.render1(w))
2428 def _check_html(html):
2429 s = remove_tags(html)
2430 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2431 d.addCallback(_check_html)
2434 return bool(lc.get_state()["last-cycle-finished"] is not None)
2435 d.addCallback(lambda ign: self.poll(_wait))
2437 def _after_first_cycle(ignored):
2439 last = s["history"][0]
2440 rec = last["space-recovered"]
2441 self.failUnlessEqual(rec["examined-buckets"], 5)
2442 self.failUnlessEqual(rec["examined-shares"], 3)
2443 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2444 d.addCallback(_after_first_cycle)
2445 d.addCallback(lambda ign: self.render_json(w))
2446 def _check_json_history(json):
2447 data = simplejson.loads(json)
2448 last = data["lease-checker"]["history"]["0"]
2449 corrupt_shares = last["corrupt-shares"]
2450 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2451 d.addCallback(_check_json_history)
2452 d.addCallback(lambda ign: self.render1(w))
2453 def _check_html_history(html):
2454 s = remove_tags(html)
2455 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2456 d.addCallback(_check_html_history)
2459 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2460 UnknownImmutableContainerVersionError)
2465 def render_json(self, page):
2466 d = self.render1(page, args={"t": ["json"]})
2469 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2472 self.s = service.MultiService()
2473 self.s.startService()
2475 return self.s.stopService()
2477 def test_no_server(self):
2478 w = StorageStatus(None)
2479 html = w.renderSynchronously()
2480 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2482 def test_status(self):
2483 basedir = "storage/WebStatus/status"
2484 fileutil.make_dirs(basedir)
2485 ss = StorageServer(basedir, "\x00" * 20)
2486 ss.setServiceParent(self.s)
2487 w = StorageStatus(ss)
2489 def _check_html(html):
2490 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2491 s = remove_tags(html)
2492 self.failUnlessIn("Accepting new shares: Yes", s)
2493 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2494 d.addCallback(_check_html)
2495 d.addCallback(lambda ign: self.render_json(w))
2496 def _check_json(json):
2497 data = simplejson.loads(json)
2499 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2500 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2501 self.failUnlessIn("bucket-counter", data)
2502 self.failUnlessIn("lease-checker", data)
2503 d.addCallback(_check_json)
2506 def render_json(self, page):
2507 d = self.render1(page, args={"t": ["json"]})
2510 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2511 def test_status_no_disk_stats(self, mock_get_disk_stats):
2512 mock_get_disk_stats.side_effect = AttributeError()
2514 # Some platforms may have no disk stats API. Make sure the code can handle that
2515 # (test runs on all platforms).
2516 basedir = "storage/WebStatus/status_no_disk_stats"
2517 fileutil.make_dirs(basedir)
2518 ss = StorageServer(basedir, "\x00" * 20)
2519 ss.setServiceParent(self.s)
2520 w = StorageStatus(ss)
2521 html = w.renderSynchronously()
2522 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2523 s = remove_tags(html)
2524 self.failUnlessIn("Accepting new shares: Yes", s)
2525 self.failUnlessIn("Total disk space: ?", s)
2526 self.failUnlessIn("Space Available to Tahoe: ?", s)
2527 self.failUnless(ss.get_available_space() is None)
2529 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2530 def test_status_bad_disk_stats(self, mock_get_disk_stats):
2531 mock_get_disk_stats.side_effect = OSError()
2533 # If the API to get disk stats exists but a call to it fails, then the status should
2534 # show that no shares will be accepted, and get_available_space() should be 0.
2535 basedir = "storage/WebStatus/status_bad_disk_stats"
2536 fileutil.make_dirs(basedir)
2537 ss = StorageServer(basedir, "\x00" * 20)
2538 ss.setServiceParent(self.s)
2539 w = StorageStatus(ss)
2540 html = w.renderSynchronously()
2541 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2542 s = remove_tags(html)
2543 self.failUnlessIn("Accepting new shares: No", s)
2544 self.failUnlessIn("Total disk space: ?", s)
2545 self.failUnlessIn("Space Available to Tahoe: ?", s)
2546 self.failUnlessEqual(ss.get_available_space(), 0)
2548 def test_readonly(self):
2549 basedir = "storage/WebStatus/readonly"
2550 fileutil.make_dirs(basedir)
2551 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2552 ss.setServiceParent(self.s)
2553 w = StorageStatus(ss)
2554 html = w.renderSynchronously()
2555 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2556 s = remove_tags(html)
2557 self.failUnlessIn("Accepting new shares: No", s)
2559 def test_reserved(self):
2560 basedir = "storage/WebStatus/reserved"
2561 fileutil.make_dirs(basedir)
2562 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2563 ss.setServiceParent(self.s)
2564 w = StorageStatus(ss)
2565 html = w.renderSynchronously()
2566 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2567 s = remove_tags(html)
2568 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2570 def test_huge_reserved(self):
2571 basedir = "storage/WebStatus/reserved"
2572 fileutil.make_dirs(basedir)
2573 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2574 ss.setServiceParent(self.s)
2575 w = StorageStatus(ss)
2576 html = w.renderSynchronously()
2577 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2578 s = remove_tags(html)
2579 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2581 def test_util(self):
2582 w = StorageStatus(None)
2583 self.failUnlessEqual(w.render_space(None, None), "?")
2584 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2585 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2586 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2587 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2588 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)