1 import time, os.path, platform, stat, re, simplejson, struct
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.interfaces import BadWriteEnablerError
24 from allmydata.test.common import LoggingServiceParent
25 from allmydata.test.common_web import WebRenderingMixin
26 from allmydata.web.storage import StorageStatus, remove_prefix
31 def __init__(self, ignore_disconnectors=False):
32 self.ignore = ignore_disconnectors
33 self.disconnectors = {}
34 def notifyOnDisconnect(self, f, *args, **kwargs):
38 self.disconnectors[m] = (f, args, kwargs)
40 def dontNotifyOnDisconnect(self, marker):
43 del self.disconnectors[marker]
45 class FakeStatsProvider:
46 def count(self, name, delta=1):
48 def register_producer(self, producer):
51 class Bucket(unittest.TestCase):
52 def make_workdir(self, name):
53 basedir = os.path.join("storage", "Bucket", name)
54 incoming = os.path.join(basedir, "tmp", "bucket")
55 final = os.path.join(basedir, "bucket")
56 fileutil.make_dirs(basedir)
57 fileutil.make_dirs(os.path.join(basedir, "tmp"))
58 return incoming, final
60 def bucket_writer_closed(self, bw, consumed):
62 def add_latency(self, category, latency):
64 def count(self, name, delta=1):
69 renew_secret = os.urandom(32)
70 cancel_secret = os.urandom(32)
71 expiration_time = time.time() + 5000
72 return LeaseInfo(owner_num, renew_secret, cancel_secret,
73 expiration_time, "\x00" * 20)
75 def test_create(self):
76 incoming, final = self.make_workdir("test_create")
77 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
79 bw.remote_write(0, "a"*25)
80 bw.remote_write(25, "b"*25)
81 bw.remote_write(50, "c"*25)
82 bw.remote_write(75, "d"*7)
85 def test_readwrite(self):
86 incoming, final = self.make_workdir("test_readwrite")
87 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
89 bw.remote_write(0, "a"*25)
90 bw.remote_write(25, "b"*25)
91 bw.remote_write(50, "c"*7) # last block may be short
95 br = BucketReader(self, bw.finalhome)
96 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
97 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
98 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
102 def callRemote(self, methname, *args, **kwargs):
104 meth = getattr(self.target, "remote_" + methname)
105 return meth(*args, **kwargs)
106 return defer.maybeDeferred(_call)
108 class BucketProxy(unittest.TestCase):
109 def make_bucket(self, name, size):
110 basedir = os.path.join("storage", "BucketProxy", name)
111 incoming = os.path.join(basedir, "tmp", "bucket")
112 final = os.path.join(basedir, "bucket")
113 fileutil.make_dirs(basedir)
114 fileutil.make_dirs(os.path.join(basedir, "tmp"))
115 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
121 def make_lease(self):
123 renew_secret = os.urandom(32)
124 cancel_secret = os.urandom(32)
125 expiration_time = time.time() + 5000
126 return LeaseInfo(owner_num, renew_secret, cancel_secret,
127 expiration_time, "\x00" * 20)
129 def bucket_writer_closed(self, bw, consumed):
131 def add_latency(self, category, latency):
133 def count(self, name, delta=1):
136 def test_create(self):
137 bw, rb, sharefname = self.make_bucket("test_create", 500)
138 bp = WriteBucketProxy(rb,
143 uri_extension_size_max=500, nodeid=None)
144 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
146 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
147 # Let's pretend each share has 100 bytes of data, and that there are
148 # 4 segments (25 bytes each), and 8 shares total. So the two
149 # per-segment merkle trees (crypttext_hash_tree,
150 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
151 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
152 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
153 # long. That should make the whole share:
155 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
156 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
158 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
160 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
162 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
164 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
166 uri_extension = "s" + "E"*498 + "e"
168 bw, rb, sharefname = self.make_bucket(name, sharesize)
174 uri_extension_size_max=len(uri_extension),
178 d.addCallback(lambda res: bp.put_block(0, "a"*25))
179 d.addCallback(lambda res: bp.put_block(1, "b"*25))
180 d.addCallback(lambda res: bp.put_block(2, "c"*25))
181 d.addCallback(lambda res: bp.put_block(3, "d"*20))
182 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
183 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
184 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
185 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
186 d.addCallback(lambda res: bp.close())
188 # now read everything back
189 def _start_reading(res):
190 br = BucketReader(self, sharefname)
193 rbp = rbp_class(rb, peerid="abc", storage_index="")
194 self.failUnlessIn("to peer", repr(rbp))
195 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
197 d1 = rbp.get_block_data(0, 25, 25)
198 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
199 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
200 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
201 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
202 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
203 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
204 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
206 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
207 d1.addCallback(lambda res:
208 self.failUnlessEqual(res, crypttext_hashes))
209 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
210 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
211 d1.addCallback(lambda res: rbp.get_share_hashes())
212 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
213 d1.addCallback(lambda res: rbp.get_uri_extension())
214 d1.addCallback(lambda res:
215 self.failUnlessEqual(res, uri_extension))
219 d.addCallback(_start_reading)
223 def test_readwrite_v1(self):
224 return self._do_test_readwrite("test_readwrite_v1",
225 0x24, WriteBucketProxy, ReadBucketProxy)
227 def test_readwrite_v2(self):
228 return self._do_test_readwrite("test_readwrite_v2",
229 0x44, WriteBucketProxy_v2, ReadBucketProxy)
231 class Server(unittest.TestCase):
234 self.sparent = LoggingServiceParent()
235 self.sparent.startService()
236 self._lease_secret = itertools.count()
238 return self.sparent.stopService()
240 def workdir(self, name):
241 basedir = os.path.join("storage", "Server", name)
244 def create(self, name, reserved_space=0, klass=StorageServer):
245 workdir = self.workdir(name)
246 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
247 stats_provider=FakeStatsProvider())
248 ss.setServiceParent(self.sparent)
251 def test_create(self):
252 self.create("test_create")
254 def allocate(self, ss, storage_index, sharenums, size, canary=None):
255 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
256 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
258 canary = FakeCanary()
259 return ss.remote_allocate_buckets(storage_index,
260 renew_secret, cancel_secret,
261 sharenums, size, canary)
263 def test_large_share(self):
264 syslow = platform.system().lower()
265 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
266 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
268 avail = fileutil.get_available_space('.', 512*2**20)
270 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
272 ss = self.create("test_large_share")
274 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
275 self.failUnlessEqual(already, set())
276 self.failUnlessEqual(set(writers.keys()), set([0]))
278 shnum, bucket = writers.items()[0]
279 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
280 bucket.remote_write(2**32, "ab")
281 bucket.remote_close()
283 readers = ss.remote_get_buckets("allocate")
284 reader = readers[shnum]
285 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
287 def test_dont_overfill_dirs(self):
289 This test asserts that if you add a second share whose storage index
290 share lots of leading bits with an extant share (but isn't the exact
291 same storage index), this won't add an entry to the share directory.
293 ss = self.create("test_dont_overfill_dirs")
294 already, writers = self.allocate(ss, "storageindex", [0], 10)
295 for i, wb in writers.items():
296 wb.remote_write(0, "%10d" % i)
298 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
300 children_of_storedir = set(os.listdir(storedir))
302 # Now store another one under another storageindex that has leading
303 # chars the same as the first storageindex.
304 already, writers = self.allocate(ss, "storageindey", [0], 10)
305 for i, wb in writers.items():
306 wb.remote_write(0, "%10d" % i)
308 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
310 new_children_of_storedir = set(os.listdir(storedir))
311 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
313 def test_remove_incoming(self):
314 ss = self.create("test_remove_incoming")
315 already, writers = self.allocate(ss, "vid", range(3), 10)
316 for i,wb in writers.items():
317 wb.remote_write(0, "%10d" % i)
319 incoming_share_dir = wb.incominghome
320 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
321 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
322 incoming_dir = os.path.dirname(incoming_prefix_dir)
323 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
324 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
325 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
327 def test_abort(self):
328 # remote_abort, when called on a writer, should make sure that
329 # the allocated size of the bucket is not counted by the storage
330 # server when accounting for space.
331 ss = self.create("test_abort")
332 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
333 self.failIfEqual(ss.allocated_size(), 0)
335 # Now abort the writers.
336 for writer in writers.itervalues():
337 writer.remote_abort()
338 self.failUnlessEqual(ss.allocated_size(), 0)
341 def test_allocate(self):
342 ss = self.create("test_allocate")
344 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
346 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
347 self.failUnlessEqual(already, set())
348 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
350 # while the buckets are open, they should not count as readable
351 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
354 for i,wb in writers.items():
355 wb.remote_write(0, "%25d" % i)
357 # aborting a bucket that was already closed is a no-op
360 # now they should be readable
361 b = ss.remote_get_buckets("allocate")
362 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
363 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
365 self.failUnlessIn("BucketReader", b_str)
366 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
368 # now if we ask about writing again, the server should offer those
369 # three buckets as already present. It should offer them even if we
370 # don't ask about those specific ones.
371 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
372 self.failUnlessEqual(already, set([0,1,2]))
373 self.failUnlessEqual(set(writers.keys()), set([3,4]))
375 # while those two buckets are open for writing, the server should
376 # refuse to offer them to uploaders
378 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
379 self.failUnlessEqual(already2, set([0,1,2]))
380 self.failUnlessEqual(set(writers2.keys()), set([5]))
382 # aborting the writes should remove the tempfiles
383 for i,wb in writers2.items():
385 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
386 self.failUnlessEqual(already2, set([0,1,2]))
387 self.failUnlessEqual(set(writers2.keys()), set([5]))
389 for i,wb in writers2.items():
391 for i,wb in writers.items():
394 def test_bad_container_version(self):
395 ss = self.create("test_bad_container_version")
396 a,w = self.allocate(ss, "si1", [0], 10)
397 w[0].remote_write(0, "\xff"*10)
400 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
403 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
406 ss.remote_get_buckets("allocate")
408 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
409 ss.remote_get_buckets, "si1")
410 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
412 def test_disconnect(self):
413 # simulate a disconnection
414 ss = self.create("test_disconnect")
415 canary = FakeCanary()
416 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
417 self.failUnlessEqual(already, set())
418 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
419 for (f,args,kwargs) in canary.disconnectors.values():
424 # that ought to delete the incoming shares
425 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
426 self.failUnlessEqual(already, set())
427 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
429 @mock.patch('allmydata.util.fileutil.get_disk_stats')
430 def test_reserved_space(self, mock_get_disk_stats):
432 mock_get_disk_stats.return_value = {
433 'free_for_nonroot': 15000,
434 'avail': max(15000 - reserved_space, 0),
437 ss = self.create("test_reserved_space", reserved_space=reserved_space)
438 # 15k available, 10k reserved, leaves 5k for shares
440 # a newly created and filled share incurs this much overhead, beyond
441 # the size we request.
443 LEASE_SIZE = 4+32+32+4
444 canary = FakeCanary(True)
445 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
446 self.failUnlessEqual(len(writers), 3)
447 # now the StorageServer should have 3000 bytes provisionally
448 # allocated, allowing only 2000 more to be claimed
449 self.failUnlessEqual(len(ss._active_writers), 3)
451 # allocating 1001-byte shares only leaves room for one
452 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
453 self.failUnlessEqual(len(writers2), 1)
454 self.failUnlessEqual(len(ss._active_writers), 4)
456 # we abandon the first set, so their provisional allocation should be
460 self.failUnlessEqual(len(ss._active_writers), 1)
461 # now we have a provisional allocation of 1001 bytes
463 # and we close the second set, so their provisional allocation should
464 # become real, long-term allocation, and grows to include the
466 for bw in writers2.values():
467 bw.remote_write(0, "a"*25)
472 self.failUnlessEqual(len(ss._active_writers), 0)
474 allocated = 1001 + OVERHEAD + LEASE_SIZE
476 # we have to manually increase available, since we're not doing real
478 mock_get_disk_stats.return_value = {
479 'free_for_nonroot': 15000 - allocated,
480 'avail': max(15000 - allocated - reserved_space, 0),
483 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
484 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
485 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
486 self.failUnlessEqual(len(writers3), 39)
487 self.failUnlessEqual(len(ss._active_writers), 39)
491 self.failUnlessEqual(len(ss._active_writers), 0)
492 ss.disownServiceParent()
496 basedir = self.workdir("test_seek_behavior")
497 fileutil.make_dirs(basedir)
498 filename = os.path.join(basedir, "testfile")
499 f = open(filename, "wb")
502 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
503 # files. mode="a" preserves previous contents but does not allow
504 # seeking-to-create-holes. mode="r+" allows both.
505 f = open(filename, "rb+")
509 filelen = os.stat(filename)[stat.ST_SIZE]
510 self.failUnlessEqual(filelen, 100+3)
511 f2 = open(filename, "rb")
512 self.failUnlessEqual(f2.read(5), "start")
515 def test_leases(self):
516 ss = self.create("test_leases")
517 canary = FakeCanary()
521 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
522 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
523 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
524 sharenums, size, canary)
525 self.failUnlessEqual(len(already), 0)
526 self.failUnlessEqual(len(writers), 5)
527 for wb in writers.values():
530 leases = list(ss.get_leases("si0"))
531 self.failUnlessEqual(len(leases), 1)
532 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
534 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
535 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
536 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
537 sharenums, size, canary)
538 for wb in writers.values():
541 # take out a second lease on si1
542 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
543 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
544 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
545 sharenums, size, canary)
546 self.failUnlessEqual(len(already), 5)
547 self.failUnlessEqual(len(writers), 0)
549 leases = list(ss.get_leases("si1"))
550 self.failUnlessEqual(len(leases), 2)
551 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
553 # and a third lease, using add-lease
554 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
555 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
556 ss.remote_add_lease("si1", rs2a, cs2a)
557 leases = list(ss.get_leases("si1"))
558 self.failUnlessEqual(len(leases), 3)
559 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
561 # add-lease on a missing storage index is silently ignored
562 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
564 # check that si0 is readable
565 readers = ss.remote_get_buckets("si0")
566 self.failUnlessEqual(len(readers), 5)
568 # renew the first lease. Only the proper renew_secret should work
569 ss.remote_renew_lease("si0", rs0)
570 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
571 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
573 # check that si0 is still readable
574 readers = ss.remote_get_buckets("si0")
575 self.failUnlessEqual(len(readers), 5)
578 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
579 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
580 ss.remote_cancel_lease("si0", cs0)
582 # si0 should now be gone
583 readers = ss.remote_get_buckets("si0")
584 self.failUnlessEqual(len(readers), 0)
585 # and the renew should no longer work
586 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
589 # cancel the first lease on si1, leaving the second and third in place
590 ss.remote_cancel_lease("si1", cs1)
591 readers = ss.remote_get_buckets("si1")
592 self.failUnlessEqual(len(readers), 5)
593 # the corresponding renew should no longer work
594 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
596 leases = list(ss.get_leases("si1"))
597 self.failUnlessEqual(len(leases), 2)
598 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
600 ss.remote_renew_lease("si1", rs2)
601 # cancelling the second and third should make it go away
602 ss.remote_cancel_lease("si1", cs2)
603 ss.remote_cancel_lease("si1", cs2a)
604 readers = ss.remote_get_buckets("si1")
605 self.failUnlessEqual(len(readers), 0)
606 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
607 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
608 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
610 leases = list(ss.get_leases("si1"))
611 self.failUnlessEqual(len(leases), 0)
614 # test overlapping uploads
615 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
616 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
617 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
618 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
619 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
620 sharenums, size, canary)
621 self.failUnlessEqual(len(already), 0)
622 self.failUnlessEqual(len(writers), 5)
623 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
624 sharenums, size, canary)
625 self.failUnlessEqual(len(already2), 0)
626 self.failUnlessEqual(len(writers2), 0)
627 for wb in writers.values():
630 leases = list(ss.get_leases("si3"))
631 self.failUnlessEqual(len(leases), 1)
633 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
634 sharenums, size, canary)
635 self.failUnlessEqual(len(already3), 5)
636 self.failUnlessEqual(len(writers3), 0)
638 leases = list(ss.get_leases("si3"))
639 self.failUnlessEqual(len(leases), 2)
641 def test_readonly(self):
642 workdir = self.workdir("test_readonly")
643 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
644 ss.setServiceParent(self.sparent)
646 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
647 self.failUnlessEqual(already, set())
648 self.failUnlessEqual(writers, {})
650 stats = ss.get_stats()
651 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
652 if "storage_server.disk_avail" in stats:
653 # Some platforms may not have an API to get disk stats.
654 # But if there are stats, readonly_storage means disk_avail=0
655 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
657 def test_discard(self):
658 # discard is really only used for other tests, but we test it anyways
659 workdir = self.workdir("test_discard")
660 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
661 ss.setServiceParent(self.sparent)
663 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
664 self.failUnlessEqual(already, set())
665 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
666 for i,wb in writers.items():
667 wb.remote_write(0, "%25d" % i)
669 # since we discard the data, the shares should be present but sparse.
670 # Since we write with some seeks, the data we read back will be all
672 b = ss.remote_get_buckets("vid")
673 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
674 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
676 def test_advise_corruption(self):
677 workdir = self.workdir("test_advise_corruption")
678 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
679 ss.setServiceParent(self.sparent)
681 si0_s = base32.b2a("si0")
682 ss.remote_advise_corrupt_share("immutable", "si0", 0,
683 "This share smells funny.\n")
684 reportdir = os.path.join(workdir, "corruption-advisories")
685 reports = os.listdir(reportdir)
686 self.failUnlessEqual(len(reports), 1)
687 report_si0 = reports[0]
688 self.failUnlessIn(si0_s, report_si0)
689 f = open(os.path.join(reportdir, report_si0), "r")
692 self.failUnlessIn("type: immutable", report)
693 self.failUnlessIn("storage_index: %s" % si0_s, report)
694 self.failUnlessIn("share_number: 0", report)
695 self.failUnlessIn("This share smells funny.", report)
697 # test the RIBucketWriter version too
698 si1_s = base32.b2a("si1")
699 already,writers = self.allocate(ss, "si1", [1], 75)
700 self.failUnlessEqual(already, set())
701 self.failUnlessEqual(set(writers.keys()), set([1]))
702 writers[1].remote_write(0, "data")
703 writers[1].remote_close()
705 b = ss.remote_get_buckets("si1")
706 self.failUnlessEqual(set(b.keys()), set([1]))
707 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
709 reports = os.listdir(reportdir)
710 self.failUnlessEqual(len(reports), 2)
711 report_si1 = [r for r in reports if si1_s in r][0]
712 f = open(os.path.join(reportdir, report_si1), "r")
715 self.failUnlessIn("type: immutable", report)
716 self.failUnlessIn("storage_index: %s" % si1_s, report)
717 self.failUnlessIn("share_number: 1", report)
718 self.failUnlessIn("This share tastes like dust.", report)
722 class MutableServer(unittest.TestCase):
725 self.sparent = LoggingServiceParent()
726 self._lease_secret = itertools.count()
728 return self.sparent.stopService()
730 def workdir(self, name):
731 basedir = os.path.join("storage", "MutableServer", name)
734 def create(self, name):
735 workdir = self.workdir(name)
736 ss = StorageServer(workdir, "\x00" * 20)
737 ss.setServiceParent(self.sparent)
740 def test_create(self):
741 self.create("test_create")
743 def write_enabler(self, we_tag):
744 return hashutil.tagged_hash("we_blah", we_tag)
746 def renew_secret(self, tag):
747 return hashutil.tagged_hash("renew_blah", str(tag))
749 def cancel_secret(self, tag):
750 return hashutil.tagged_hash("cancel_blah", str(tag))
752 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
753 write_enabler = self.write_enabler(we_tag)
754 renew_secret = self.renew_secret(lease_tag)
755 cancel_secret = self.cancel_secret(lease_tag)
756 rstaraw = ss.remote_slot_testv_and_readv_and_writev
757 testandwritev = dict( [ (shnum, ([], [], None) )
758 for shnum in sharenums ] )
760 rc = rstaraw(storage_index,
761 (write_enabler, renew_secret, cancel_secret),
764 (did_write, readv_data) = rc
765 self.failUnless(did_write)
766 self.failUnless(isinstance(readv_data, dict))
767 self.failUnlessEqual(len(readv_data), 0)
769 def test_bad_magic(self):
770 ss = self.create("test_bad_magic")
771 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
772 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
777 read = ss.remote_slot_readv
778 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
779 read, "si1", [0], [(0,10)])
780 self.failUnlessIn(" had magic ", str(e))
781 self.failUnlessIn(" but we wanted ", str(e))
783 def test_container_size(self):
784 ss = self.create("test_container_size")
785 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
787 read = ss.remote_slot_readv
788 rstaraw = ss.remote_slot_testv_and_readv_and_writev
789 secrets = ( self.write_enabler("we1"),
790 self.renew_secret("we1"),
791 self.cancel_secret("we1") )
792 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
793 answer = rstaraw("si1", secrets,
794 {0: ([], [(0,data)], len(data)+12)},
796 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
798 # trying to make the container too large will raise an exception
799 TOOBIG = MutableShareFile.MAX_SIZE + 10
800 self.failUnlessRaises(DataTooLargeError,
801 rstaraw, "si1", secrets,
802 {0: ([], [(0,data)], TOOBIG)},
805 # it should be possible to make the container smaller, although at
806 # the moment this doesn't actually affect the share, unless the
807 # container size is dropped to zero, in which case the share is
809 answer = rstaraw("si1", secrets,
810 {0: ([], [(0,data)], len(data)+8)},
812 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
814 answer = rstaraw("si1", secrets,
815 {0: ([], [(0,data)], 0)},
817 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
819 read_answer = read("si1", [0], [(0,10)])
820 self.failUnlessEqual(read_answer, {})
822 def test_allocate(self):
823 ss = self.create("test_allocate")
824 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
827 read = ss.remote_slot_readv
828 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
830 self.failUnlessEqual(read("si1", [], [(0, 10)]),
831 {0: [""], 1: [""], 2: [""]})
832 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
836 secrets = ( self.write_enabler("we1"),
837 self.renew_secret("we1"),
838 self.cancel_secret("we1") )
839 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
840 write = ss.remote_slot_testv_and_readv_and_writev
841 answer = write("si1", secrets,
842 {0: ([], [(0,data)], None)},
844 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
846 self.failUnlessEqual(read("si1", [0], [(0,20)]),
847 {0: ["00000000001111111111"]})
848 self.failUnlessEqual(read("si1", [0], [(95,10)]),
850 #self.failUnlessEqual(s0.remote_get_length(), 100)
852 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
853 f = self.failUnlessRaises(BadWriteEnablerError,
854 write, "si1", bad_secrets,
856 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
858 # this testv should fail
859 answer = write("si1", secrets,
860 {0: ([(0, 12, "eq", "444444444444"),
861 (20, 5, "eq", "22222"),
868 self.failUnlessEqual(answer, (False,
869 {0: ["000000000011", "22222"],
873 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
876 answer = write("si1", secrets,
877 {0: ([(10, 5, "lt", "11111"),
884 self.failUnlessEqual(answer, (False,
889 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
892 def test_operators(self):
893 # test operators, the data we're comparing is '11111' in all cases.
894 # test both fail+pass, reset data after each one.
895 ss = self.create("test_operators")
897 secrets = ( self.write_enabler("we1"),
898 self.renew_secret("we1"),
899 self.cancel_secret("we1") )
900 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
901 write = ss.remote_slot_testv_and_readv_and_writev
902 read = ss.remote_slot_readv
905 write("si1", secrets,
906 {0: ([], [(0,data)], None)},
912 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
917 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
918 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
919 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
922 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
927 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
928 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
931 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
936 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
937 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
941 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
946 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
947 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
950 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
955 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
956 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
959 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
964 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
965 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
969 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
974 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
978 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
983 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
984 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
988 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
993 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
994 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
997 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1002 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1003 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1007 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1012 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1013 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1016 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1021 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1022 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1025 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1030 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1031 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1035 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1040 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1041 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1044 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1049 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1050 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1053 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1058 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1059 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1062 # finally, test some operators against empty shares
1063 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1068 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1069 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1072 def test_readv(self):
1073 ss = self.create("test_readv")
1074 secrets = ( self.write_enabler("we1"),
1075 self.renew_secret("we1"),
1076 self.cancel_secret("we1") )
1077 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1078 write = ss.remote_slot_testv_and_readv_and_writev
1079 read = ss.remote_slot_readv
1080 data = [("%d" % i) * 100 for i in range(3)]
1081 rc = write("si1", secrets,
1082 {0: ([], [(0,data[0])], None),
1083 1: ([], [(0,data[1])], None),
1084 2: ([], [(0,data[2])], None),
1086 self.failUnlessEqual(rc, (True, {}))
1088 answer = read("si1", [], [(0, 10)])
1089 self.failUnlessEqual(answer, {0: ["0"*10],
1093 def compare_leases_without_timestamps(self, leases_a, leases_b):
1094 self.failUnlessEqual(len(leases_a), len(leases_b))
1095 for i in range(len(leases_a)):
1098 self.failUnlessEqual(a.owner_num, b.owner_num)
1099 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1100 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1101 self.failUnlessEqual(a.nodeid, b.nodeid)
1103 def compare_leases(self, leases_a, leases_b):
1104 self.failUnlessEqual(len(leases_a), len(leases_b))
1105 for i in range(len(leases_a)):
1108 self.failUnlessEqual(a.owner_num, b.owner_num)
1109 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1110 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1111 self.failUnlessEqual(a.nodeid, b.nodeid)
1112 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1114 def test_leases(self):
1115 ss = self.create("test_leases")
1117 return ( self.write_enabler("we1"),
1118 self.renew_secret("we1-%d" % n),
1119 self.cancel_secret("we1-%d" % n) )
1120 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1121 write = ss.remote_slot_testv_and_readv_and_writev
1122 read = ss.remote_slot_readv
1123 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1124 self.failUnlessEqual(rc, (True, {}))
1126 # create a random non-numeric file in the bucket directory, to
1127 # exercise the code that's supposed to ignore those.
1128 bucket_dir = os.path.join(self.workdir("test_leases"),
1129 "shares", storage_index_to_dir("si1"))
1130 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1131 f.write("you ought to be ignoring me\n")
1134 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1135 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1137 # add-lease on a missing storage index is silently ignored
1138 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1140 # re-allocate the slots and use the same secrets, that should update
1142 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1143 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1146 ss.remote_renew_lease("si1", secrets(0)[1])
1147 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1149 # now allocate them with a bunch of different secrets, to trigger the
1150 # extended lease code. Use add_lease for one of them.
1151 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1152 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1153 secrets2 = secrets(2)
1154 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1155 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1156 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1157 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1158 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1160 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1162 # cancel one of them
1163 ss.remote_cancel_lease("si1", secrets(5)[2])
1164 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1166 all_leases = list(s0.get_leases())
1167 # and write enough data to expand the container, forcing the server
1168 # to move the leases
1169 write("si1", secrets(0),
1170 {0: ([], [(0,data)], 200), },
1173 # read back the leases, make sure they're still intact.
1174 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1176 ss.remote_renew_lease("si1", secrets(0)[1])
1177 ss.remote_renew_lease("si1", secrets(1)[1])
1178 ss.remote_renew_lease("si1", secrets(2)[1])
1179 ss.remote_renew_lease("si1", secrets(3)[1])
1180 ss.remote_renew_lease("si1", secrets(4)[1])
1181 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1182 # get a new copy of the leases, with the current timestamps. Reading
1183 # data and failing to renew/cancel leases should leave the timestamps
1185 all_leases = list(s0.get_leases())
1186 # renewing with a bogus token should prompt an error message
1188 # examine the exception thus raised, make sure the old nodeid is
1189 # present, to provide for share migration
1190 e = self.failUnlessRaises(IndexError,
1191 ss.remote_renew_lease, "si1",
1194 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1195 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1196 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1198 # same for cancelling
1199 self.failUnlessRaises(IndexError,
1200 ss.remote_cancel_lease, "si1",
1202 self.compare_leases(all_leases, list(s0.get_leases()))
1204 # reading shares should not modify the timestamp
1205 read("si1", [], [(0,200)])
1206 self.compare_leases(all_leases, list(s0.get_leases()))
1208 write("si1", secrets(0),
1209 {0: ([], [(200, "make me bigger")], None)}, [])
1210 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1212 write("si1", secrets(0),
1213 {0: ([], [(500, "make me really bigger")], None)}, [])
1214 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1216 # now cancel them all
1217 ss.remote_cancel_lease("si1", secrets(0)[2])
1218 ss.remote_cancel_lease("si1", secrets(1)[2])
1219 ss.remote_cancel_lease("si1", secrets(2)[2])
1220 ss.remote_cancel_lease("si1", secrets(3)[2])
1222 # the slot should still be there
1223 remaining_shares = read("si1", [], [(0,10)])
1224 self.failUnlessEqual(len(remaining_shares), 1)
1225 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1227 # cancelling a non-existent lease should raise an IndexError
1228 self.failUnlessRaises(IndexError,
1229 ss.remote_cancel_lease, "si1", "nonsecret")
1231 # and the slot should still be there
1232 remaining_shares = read("si1", [], [(0,10)])
1233 self.failUnlessEqual(len(remaining_shares), 1)
1234 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1236 ss.remote_cancel_lease("si1", secrets(4)[2])
1237 # now the slot should be gone
1238 no_shares = read("si1", [], [(0,10)])
1239 self.failUnlessEqual(no_shares, {})
1241 # cancelling a lease on a non-existent share should raise an IndexError
1242 self.failUnlessRaises(IndexError,
1243 ss.remote_cancel_lease, "si2", "nonsecret")
1245 def test_remove(self):
1246 ss = self.create("test_remove")
1247 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1249 readv = ss.remote_slot_readv
1250 writev = ss.remote_slot_testv_and_readv_and_writev
1251 secrets = ( self.write_enabler("we1"),
1252 self.renew_secret("we1"),
1253 self.cancel_secret("we1") )
1254 # delete sh0 by setting its size to zero
1255 answer = writev("si1", secrets,
1258 # the answer should mention all the shares that existed before the
1260 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1261 # but a new read should show only sh1 and sh2
1262 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1265 # delete sh1 by setting its size to zero
1266 answer = writev("si1", secrets,
1269 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1270 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1273 # delete sh2 by setting its size to zero
1274 answer = writev("si1", secrets,
1277 self.failUnlessEqual(answer, (True, {2:[]}) )
1278 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1280 # and the bucket directory should now be gone
1281 si = base32.b2a("si1")
1282 # note: this is a detail of the storage server implementation, and
1283 # may change in the future
1285 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1286 bucketdir = os.path.join(prefixdir, si)
1287 self.failUnless(os.path.exists(prefixdir), prefixdir)
1288 self.failIf(os.path.exists(bucketdir), bucketdir)
1290 class Stats(unittest.TestCase):
1293 self.sparent = LoggingServiceParent()
1294 self._lease_secret = itertools.count()
1296 return self.sparent.stopService()
1298 def workdir(self, name):
1299 basedir = os.path.join("storage", "Server", name)
1302 def create(self, name):
1303 workdir = self.workdir(name)
1304 ss = StorageServer(workdir, "\x00" * 20)
1305 ss.setServiceParent(self.sparent)
1308 def test_latencies(self):
1309 ss = self.create("test_latencies")
1310 for i in range(10000):
1311 ss.add_latency("allocate", 1.0 * i)
1312 for i in range(1000):
1313 ss.add_latency("renew", 1.0 * i)
1315 ss.add_latency("cancel", 2.0 * i)
1316 ss.add_latency("get", 5.0)
1318 output = ss.get_latencies()
1320 self.failUnlessEqual(sorted(output.keys()),
1321 sorted(["allocate", "renew", "cancel", "get"]))
1322 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1323 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1324 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1325 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1326 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1327 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1328 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1329 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1330 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1332 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1333 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1334 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
1335 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1336 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1337 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1338 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1339 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1340 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1342 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1343 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1344 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1, output)
1345 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
1346 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1347 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1348 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1349 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1350 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1352 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1353 self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1354 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1355 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1356 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1357 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1358 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1359 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1360 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1363 s = re.sub(r'<[^>]*>', ' ', s)
1364 s = re.sub(r'\s+', ' ', s)
1367 class MyBucketCountingCrawler(BucketCountingCrawler):
1368 def finished_prefix(self, cycle, prefix):
1369 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1371 d = self.hook_ds.pop(0)
1374 class MyStorageServer(StorageServer):
1375 def add_bucket_counter(self):
1376 statefile = os.path.join(self.storedir, "bucket_counter.state")
1377 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1378 self.bucket_counter.setServiceParent(self)
1380 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1383 self.s = service.MultiService()
1384 self.s.startService()
1386 return self.s.stopService()
1388 def test_bucket_counter(self):
1389 basedir = "storage/BucketCounter/bucket_counter"
1390 fileutil.make_dirs(basedir)
1391 ss = StorageServer(basedir, "\x00" * 20)
1392 # to make sure we capture the bucket-counting-crawler in the middle
1393 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1394 # also make it start sooner than usual.
1395 ss.bucket_counter.slow_start = 0
1396 orig_cpu_slice = ss.bucket_counter.cpu_slice
1397 ss.bucket_counter.cpu_slice = 0
1398 ss.setServiceParent(self.s)
1400 w = StorageStatus(ss)
1402 # this sample is before the crawler has started doing anything
1403 html = w.renderSynchronously()
1404 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1405 s = remove_tags(html)
1406 self.failUnlessIn("Accepting new shares: Yes", s)
1407 self.failUnlessIn("Reserved space: - 0 B (0)", s)
1408 self.failUnlessIn("Total buckets: Not computed yet", s)
1409 self.failUnlessIn("Next crawl in", s)
1411 # give the bucket-counting-crawler one tick to get started. The
1412 # cpu_slice=0 will force it to yield right after it processes the
1415 d = fireEventually()
1416 def _check(ignored):
1417 # are we really right after the first prefix?
1418 state = ss.bucket_counter.get_state()
1419 if state["last-complete-prefix"] is None:
1420 d2 = fireEventually()
1421 d2.addCallback(_check)
1423 self.failUnlessEqual(state["last-complete-prefix"],
1424 ss.bucket_counter.prefixes[0])
1425 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1426 html = w.renderSynchronously()
1427 s = remove_tags(html)
1428 self.failUnlessIn(" Current crawl ", s)
1429 self.failUnlessIn(" (next work in ", s)
1430 d.addCallback(_check)
1432 # now give it enough time to complete a full cycle
1434 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1435 d.addCallback(lambda ignored: self.poll(_watch))
1436 def _check2(ignored):
1437 ss.bucket_counter.cpu_slice = orig_cpu_slice
1438 html = w.renderSynchronously()
1439 s = remove_tags(html)
1440 self.failUnlessIn("Total buckets: 0 (the number of", s)
1441 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1442 d.addCallback(_check2)
1445 def test_bucket_counter_cleanup(self):
1446 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1447 fileutil.make_dirs(basedir)
1448 ss = StorageServer(basedir, "\x00" * 20)
1449 # to make sure we capture the bucket-counting-crawler in the middle
1450 # of a cycle, we reach in and reduce its maximum slice time to 0.
1451 ss.bucket_counter.slow_start = 0
1452 orig_cpu_slice = ss.bucket_counter.cpu_slice
1453 ss.bucket_counter.cpu_slice = 0
1454 ss.setServiceParent(self.s)
1456 d = fireEventually()
1458 def _after_first_prefix(ignored):
1459 state = ss.bucket_counter.state
1460 if state["last-complete-prefix"] is None:
1461 d2 = fireEventually()
1462 d2.addCallback(_after_first_prefix)
1464 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1465 # now sneak in and mess with its state, to make sure it cleans up
1466 # properly at the end of the cycle
1467 self.failUnlessEqual(state["last-complete-prefix"],
1468 ss.bucket_counter.prefixes[0])
1469 state["bucket-counts"][-12] = {}
1470 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1471 ss.bucket_counter.save_state()
1472 d.addCallback(_after_first_prefix)
1474 # now give it enough time to complete a cycle
1476 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1477 d.addCallback(lambda ignored: self.poll(_watch))
1478 def _check2(ignored):
1479 ss.bucket_counter.cpu_slice = orig_cpu_slice
1480 s = ss.bucket_counter.get_state()
1481 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1482 self.failIf("bogusprefix!" in s["storage-index-samples"],
1483 s["storage-index-samples"].keys())
1484 d.addCallback(_check2)
1487 def test_bucket_counter_eta(self):
1488 basedir = "storage/BucketCounter/bucket_counter_eta"
1489 fileutil.make_dirs(basedir)
1490 ss = MyStorageServer(basedir, "\x00" * 20)
1491 ss.bucket_counter.slow_start = 0
1492 # these will be fired inside finished_prefix()
1493 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1494 w = StorageStatus(ss)
1496 d = defer.Deferred()
1498 def _check_1(ignored):
1499 # no ETA is available yet
1500 html = w.renderSynchronously()
1501 s = remove_tags(html)
1502 self.failUnlessIn("complete (next work", s)
1504 def _check_2(ignored):
1505 # one prefix has finished, so an ETA based upon that elapsed time
1506 # should be available.
1507 html = w.renderSynchronously()
1508 s = remove_tags(html)
1509 self.failUnlessIn("complete (ETA ", s)
1511 def _check_3(ignored):
1512 # two prefixes have finished
1513 html = w.renderSynchronously()
1514 s = remove_tags(html)
1515 self.failUnlessIn("complete (ETA ", s)
1518 hooks[0].addCallback(_check_1).addErrback(d.errback)
1519 hooks[1].addCallback(_check_2).addErrback(d.errback)
1520 hooks[2].addCallback(_check_3).addErrback(d.errback)
1522 ss.setServiceParent(self.s)
1525 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1526 stop_after_first_bucket = False
1527 def process_bucket(self, *args, **kwargs):
1528 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1529 if self.stop_after_first_bucket:
1530 self.stop_after_first_bucket = False
1531 self.cpu_slice = -1.0
1532 def yielding(self, sleep_time):
1533 if not self.stop_after_first_bucket:
1534 self.cpu_slice = 500
1536 class BrokenStatResults:
1538 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1541 bsr = BrokenStatResults()
1542 for attrname in dir(s):
1543 if attrname.startswith("_"):
1545 if attrname == "st_blocks":
1547 setattr(bsr, attrname, getattr(s, attrname))
1550 class InstrumentedStorageServer(StorageServer):
1551 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1552 class No_ST_BLOCKS_StorageServer(StorageServer):
1553 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1555 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1558 self.s = service.MultiService()
1559 self.s.startService()
1561 return self.s.stopService()
1563 def make_shares(self, ss):
1565 return (si, hashutil.tagged_hash("renew", si),
1566 hashutil.tagged_hash("cancel", si))
1567 def make_mutable(si):
1568 return (si, hashutil.tagged_hash("renew", si),
1569 hashutil.tagged_hash("cancel", si),
1570 hashutil.tagged_hash("write-enabler", si))
1571 def make_extra_lease(si, num):
1572 return (hashutil.tagged_hash("renew-%d" % num, si),
1573 hashutil.tagged_hash("cancel-%d" % num, si))
1575 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1576 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1577 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1578 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1579 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1580 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1582 canary = FakeCanary()
1583 # note: 'tahoe debug dump-share' will not handle this file, since the
1584 # inner contents are not a valid CHK share
1585 data = "\xff" * 1000
1587 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1589 w[0].remote_write(0, data)
1592 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1594 w[0].remote_write(0, data)
1596 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1598 writev = ss.remote_slot_testv_and_readv_and_writev
1599 writev(mutable_si_2, (we2, rs2, cs2),
1600 {0: ([], [(0,data)], len(data))}, [])
1601 writev(mutable_si_3, (we3, rs3, cs3),
1602 {0: ([], [(0,data)], len(data))}, [])
1603 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1605 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1606 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1607 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1609 def test_basic(self):
1610 basedir = "storage/LeaseCrawler/basic"
1611 fileutil.make_dirs(basedir)
1612 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1613 # make it start sooner than usual.
1614 lc = ss.lease_checker
1617 lc.stop_after_first_bucket = True
1618 webstatus = StorageStatus(ss)
1620 # create a few shares, with some leases on them
1621 self.make_shares(ss)
1622 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1624 # add a non-sharefile to exercise another code path
1625 fn = os.path.join(ss.sharedir,
1626 storage_index_to_dir(immutable_si_0),
1629 f.write("I am not a share.\n")
1632 # this is before the crawl has started, so we're not in a cycle yet
1633 initial_state = lc.get_state()
1634 self.failIf(lc.get_progress()["cycle-in-progress"])
1635 self.failIfIn("cycle-to-date", initial_state)
1636 self.failIfIn("estimated-remaining-cycle", initial_state)
1637 self.failIfIn("estimated-current-cycle", initial_state)
1638 self.failUnlessIn("history", initial_state)
1639 self.failUnlessEqual(initial_state["history"], {})
1641 ss.setServiceParent(self.s)
1645 d = fireEventually()
1647 # now examine the state right after the first bucket has been
1649 def _after_first_bucket(ignored):
1650 initial_state = lc.get_state()
1651 if "cycle-to-date" not in initial_state:
1652 d2 = fireEventually()
1653 d2.addCallback(_after_first_bucket)
1655 self.failUnlessIn("cycle-to-date", initial_state)
1656 self.failUnlessIn("estimated-remaining-cycle", initial_state)
1657 self.failUnlessIn("estimated-current-cycle", initial_state)
1658 self.failUnlessIn("history", initial_state)
1659 self.failUnlessEqual(initial_state["history"], {})
1661 so_far = initial_state["cycle-to-date"]
1662 self.failUnlessEqual(so_far["expiration-enabled"], False)
1663 self.failUnlessIn("configured-expiration-mode", so_far)
1664 self.failUnlessIn("lease-age-histogram", so_far)
1665 lah = so_far["lease-age-histogram"]
1666 self.failUnlessEqual(type(lah), list)
1667 self.failUnlessEqual(len(lah), 1)
1668 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1669 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1670 self.failUnlessEqual(so_far["corrupt-shares"], [])
1671 sr1 = so_far["space-recovered"]
1672 self.failUnlessEqual(sr1["examined-buckets"], 1)
1673 self.failUnlessEqual(sr1["examined-shares"], 1)
1674 self.failUnlessEqual(sr1["actual-shares"], 0)
1675 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1676 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1677 left = initial_state["estimated-remaining-cycle"]
1678 sr2 = left["space-recovered"]
1679 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1680 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1681 self.failIfEqual(sr2["actual-shares"], None)
1682 self.failIfEqual(sr2["configured-diskbytes"], None)
1683 self.failIfEqual(sr2["original-sharebytes"], None)
1684 d.addCallback(_after_first_bucket)
1685 d.addCallback(lambda ign: self.render1(webstatus))
1686 def _check_html_in_cycle(html):
1687 s = remove_tags(html)
1688 self.failUnlessIn("So far, this cycle has examined "
1689 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1690 self.failUnlessIn("and has recovered: "
1691 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1692 "0 B (0 B / 0 B)", s)
1693 self.failUnlessIn("If expiration were enabled, "
1694 "we would have recovered: "
1695 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1696 " 0 B (0 B / 0 B) by now", s)
1697 self.failUnlessIn("and the remainder of this cycle "
1698 "would probably recover: "
1699 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1700 " 0 B (0 B / 0 B)", s)
1701 self.failUnlessIn("and the whole cycle would probably recover: "
1702 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1703 " 0 B (0 B / 0 B)", s)
1704 self.failUnlessIn("if we were strictly using each lease's default "
1705 "31-day lease lifetime", s)
1706 self.failUnlessIn("this cycle would be expected to recover: ", s)
1707 d.addCallback(_check_html_in_cycle)
1709 # wait for the crawler to finish the first cycle. Nothing should have
1712 return bool(lc.get_state()["last-cycle-finished"] is not None)
1713 d.addCallback(lambda ign: self.poll(_wait))
1715 def _after_first_cycle(ignored):
1717 self.failIf("cycle-to-date" in s)
1718 self.failIf("estimated-remaining-cycle" in s)
1719 self.failIf("estimated-current-cycle" in s)
1720 last = s["history"][0]
1721 self.failUnlessIn("cycle-start-finish-times", last)
1722 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1723 self.failUnlessEqual(last["expiration-enabled"], False)
1724 self.failUnlessIn("configured-expiration-mode", last)
1726 self.failUnlessIn("lease-age-histogram", last)
1727 lah = last["lease-age-histogram"]
1728 self.failUnlessEqual(type(lah), list)
1729 self.failUnlessEqual(len(lah), 1)
1730 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1732 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1733 self.failUnlessEqual(last["corrupt-shares"], [])
1735 rec = last["space-recovered"]
1736 self.failUnlessEqual(rec["examined-buckets"], 4)
1737 self.failUnlessEqual(rec["examined-shares"], 4)
1738 self.failUnlessEqual(rec["actual-buckets"], 0)
1739 self.failUnlessEqual(rec["original-buckets"], 0)
1740 self.failUnlessEqual(rec["configured-buckets"], 0)
1741 self.failUnlessEqual(rec["actual-shares"], 0)
1742 self.failUnlessEqual(rec["original-shares"], 0)
1743 self.failUnlessEqual(rec["configured-shares"], 0)
1744 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1745 self.failUnlessEqual(rec["original-diskbytes"], 0)
1746 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1747 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1748 self.failUnlessEqual(rec["original-sharebytes"], 0)
1749 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1751 def _get_sharefile(si):
1752 return list(ss._iter_share_files(si))[0]
1753 def count_leases(si):
1754 return len(list(_get_sharefile(si).get_leases()))
1755 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1756 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1757 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1758 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1759 d.addCallback(_after_first_cycle)
1760 d.addCallback(lambda ign: self.render1(webstatus))
1761 def _check_html(html):
1762 s = remove_tags(html)
1763 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1764 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1765 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1766 "(2 mutable / 2 immutable),", s)
1767 self.failUnlessIn("but expiration was not enabled", s)
1768 d.addCallback(_check_html)
1769 d.addCallback(lambda ign: self.render_json(webstatus))
1770 def _check_json(json):
1771 data = simplejson.loads(json)
1772 self.failUnlessIn("lease-checker", data)
1773 self.failUnlessIn("lease-checker-progress", data)
1774 d.addCallback(_check_json)
1777 def backdate_lease(self, sf, renew_secret, new_expire_time):
1778 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1779 # "renew" a lease with a new_expire_time that is older than what the
1780 # current lease has), so we have to reach inside it.
1781 for i,lease in enumerate(sf.get_leases()):
1782 if lease.renew_secret == renew_secret:
1783 lease.expiration_time = new_expire_time
1784 f = open(sf.home, 'rb+')
1785 sf._write_lease_record(f, i, lease)
1788 raise IndexError("unable to renew non-existent lease")
1790 def test_expire_age(self):
1791 basedir = "storage/LeaseCrawler/expire_age"
1792 fileutil.make_dirs(basedir)
1793 # setting expiration_time to 2000 means that any lease which is more
1794 # than 2000s old will be expired.
1795 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1796 expiration_enabled=True,
1797 expiration_mode="age",
1798 expiration_override_lease_duration=2000)
1799 # make it start sooner than usual.
1800 lc = ss.lease_checker
1802 lc.stop_after_first_bucket = True
1803 webstatus = StorageStatus(ss)
1805 # create a few shares, with some leases on them
1806 self.make_shares(ss)
1807 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1809 def count_shares(si):
1810 return len(list(ss._iter_share_files(si)))
1811 def _get_sharefile(si):
1812 return list(ss._iter_share_files(si))[0]
1813 def count_leases(si):
1814 return len(list(_get_sharefile(si).get_leases()))
1816 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1817 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1818 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1819 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1820 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1821 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1822 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1823 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1825 # artificially crank back the expiration time on the first lease of
1826 # each share, to make it look like it expired already (age=1000s).
1827 # Some shares have an extra lease which is set to expire at the
1828 # default time in 31 days from now (age=31days). We then run the
1829 # crawler, which will expire the first lease, making some shares get
1830 # deleted and others stay alive (with one remaining lease)
1833 sf0 = _get_sharefile(immutable_si_0)
1834 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1835 sf0_size = os.stat(sf0.home).st_size
1837 # immutable_si_1 gets an extra lease
1838 sf1 = _get_sharefile(immutable_si_1)
1839 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1841 sf2 = _get_sharefile(mutable_si_2)
1842 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1843 sf2_size = os.stat(sf2.home).st_size
1845 # mutable_si_3 gets an extra lease
1846 sf3 = _get_sharefile(mutable_si_3)
1847 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1849 ss.setServiceParent(self.s)
1851 d = fireEventually()
1852 # examine the state right after the first bucket has been processed
1853 def _after_first_bucket(ignored):
1854 p = lc.get_progress()
1855 if not p["cycle-in-progress"]:
1856 d2 = fireEventually()
1857 d2.addCallback(_after_first_bucket)
1859 d.addCallback(_after_first_bucket)
1860 d.addCallback(lambda ign: self.render1(webstatus))
1861 def _check_html_in_cycle(html):
1862 s = remove_tags(html)
1863 # the first bucket encountered gets deleted, and its prefix
1864 # happens to be about 1/5th of the way through the ring, so the
1865 # predictor thinks we'll have 5 shares and that we'll delete them
1866 # all. This part of the test depends upon the SIs landing right
1867 # where they do now.
1868 self.failUnlessIn("The remainder of this cycle is expected to "
1869 "recover: 4 shares, 4 buckets", s)
1870 self.failUnlessIn("The whole cycle is expected to examine "
1871 "5 shares in 5 buckets and to recover: "
1872 "5 shares, 5 buckets", s)
1873 d.addCallback(_check_html_in_cycle)
1875 # wait for the crawler to finish the first cycle. Two shares should
1878 return bool(lc.get_state()["last-cycle-finished"] is not None)
1879 d.addCallback(lambda ign: self.poll(_wait))
1881 def _after_first_cycle(ignored):
1882 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1883 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1884 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1885 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1886 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1887 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1890 last = s["history"][0]
1892 self.failUnlessEqual(last["expiration-enabled"], True)
1893 self.failUnlessEqual(last["configured-expiration-mode"],
1894 ("age", 2000, None, ("mutable", "immutable")))
1895 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1897 rec = last["space-recovered"]
1898 self.failUnlessEqual(rec["examined-buckets"], 4)
1899 self.failUnlessEqual(rec["examined-shares"], 4)
1900 self.failUnlessEqual(rec["actual-buckets"], 2)
1901 self.failUnlessEqual(rec["original-buckets"], 2)
1902 self.failUnlessEqual(rec["configured-buckets"], 2)
1903 self.failUnlessEqual(rec["actual-shares"], 2)
1904 self.failUnlessEqual(rec["original-shares"], 2)
1905 self.failUnlessEqual(rec["configured-shares"], 2)
1906 size = sf0_size + sf2_size
1907 self.failUnlessEqual(rec["actual-sharebytes"], size)
1908 self.failUnlessEqual(rec["original-sharebytes"], size)
1909 self.failUnlessEqual(rec["configured-sharebytes"], size)
1910 # different platforms have different notions of "blocks used by
1911 # this file", so merely assert that it's a number
1912 self.failUnless(rec["actual-diskbytes"] >= 0,
1913 rec["actual-diskbytes"])
1914 self.failUnless(rec["original-diskbytes"] >= 0,
1915 rec["original-diskbytes"])
1916 self.failUnless(rec["configured-diskbytes"] >= 0,
1917 rec["configured-diskbytes"])
1918 d.addCallback(_after_first_cycle)
1919 d.addCallback(lambda ign: self.render1(webstatus))
1920 def _check_html(html):
1921 s = remove_tags(html)
1922 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1923 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1924 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1925 d.addCallback(_check_html)
1928 def test_expire_cutoff_date(self):
1929 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1930 fileutil.make_dirs(basedir)
1931 # setting cutoff-date to 2000 seconds ago means that any lease which
1932 # is more than 2000s old will be expired.
1934 then = int(now - 2000)
1935 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1936 expiration_enabled=True,
1937 expiration_mode="cutoff-date",
1938 expiration_cutoff_date=then)
1939 # make it start sooner than usual.
1940 lc = ss.lease_checker
1942 lc.stop_after_first_bucket = True
1943 webstatus = StorageStatus(ss)
1945 # create a few shares, with some leases on them
1946 self.make_shares(ss)
1947 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1949 def count_shares(si):
1950 return len(list(ss._iter_share_files(si)))
1951 def _get_sharefile(si):
1952 return list(ss._iter_share_files(si))[0]
1953 def count_leases(si):
1954 return len(list(_get_sharefile(si).get_leases()))
1956 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1957 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1958 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1959 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1960 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1961 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1962 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1963 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1965 # artificially crank back the expiration time on the first lease of
1966 # each share, to make it look like was renewed 3000s ago. To achieve
1967 # this, we need to set the expiration time to now-3000+31days. This
1968 # will change when the lease format is improved to contain both
1969 # create/renew time and duration.
1970 new_expiration_time = now - 3000 + 31*24*60*60
1972 # Some shares have an extra lease which is set to expire at the
1973 # default time in 31 days from now (age=31days). We then run the
1974 # crawler, which will expire the first lease, making some shares get
1975 # deleted and others stay alive (with one remaining lease)
1977 sf0 = _get_sharefile(immutable_si_0)
1978 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1979 sf0_size = os.stat(sf0.home).st_size
1981 # immutable_si_1 gets an extra lease
1982 sf1 = _get_sharefile(immutable_si_1)
1983 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1985 sf2 = _get_sharefile(mutable_si_2)
1986 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1987 sf2_size = os.stat(sf2.home).st_size
1989 # mutable_si_3 gets an extra lease
1990 sf3 = _get_sharefile(mutable_si_3)
1991 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1993 ss.setServiceParent(self.s)
1995 d = fireEventually()
1996 # examine the state right after the first bucket has been processed
1997 def _after_first_bucket(ignored):
1998 p = lc.get_progress()
1999 if not p["cycle-in-progress"]:
2000 d2 = fireEventually()
2001 d2.addCallback(_after_first_bucket)
2003 d.addCallback(_after_first_bucket)
2004 d.addCallback(lambda ign: self.render1(webstatus))
2005 def _check_html_in_cycle(html):
2006 s = remove_tags(html)
2007 # the first bucket encountered gets deleted, and its prefix
2008 # happens to be about 1/5th of the way through the ring, so the
2009 # predictor thinks we'll have 5 shares and that we'll delete them
2010 # all. This part of the test depends upon the SIs landing right
2011 # where they do now.
2012 self.failUnlessIn("The remainder of this cycle is expected to "
2013 "recover: 4 shares, 4 buckets", s)
2014 self.failUnlessIn("The whole cycle is expected to examine "
2015 "5 shares in 5 buckets and to recover: "
2016 "5 shares, 5 buckets", s)
2017 d.addCallback(_check_html_in_cycle)
2019 # wait for the crawler to finish the first cycle. Two shares should
2022 return bool(lc.get_state()["last-cycle-finished"] is not None)
2023 d.addCallback(lambda ign: self.poll(_wait))
2025 def _after_first_cycle(ignored):
2026 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2027 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2028 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2029 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2030 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2031 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2034 last = s["history"][0]
2036 self.failUnlessEqual(last["expiration-enabled"], True)
2037 self.failUnlessEqual(last["configured-expiration-mode"],
2038 ("cutoff-date", None, then,
2039 ("mutable", "immutable")))
2040 self.failUnlessEqual(last["leases-per-share-histogram"],
2043 rec = last["space-recovered"]
2044 self.failUnlessEqual(rec["examined-buckets"], 4)
2045 self.failUnlessEqual(rec["examined-shares"], 4)
2046 self.failUnlessEqual(rec["actual-buckets"], 2)
2047 self.failUnlessEqual(rec["original-buckets"], 0)
2048 self.failUnlessEqual(rec["configured-buckets"], 2)
2049 self.failUnlessEqual(rec["actual-shares"], 2)
2050 self.failUnlessEqual(rec["original-shares"], 0)
2051 self.failUnlessEqual(rec["configured-shares"], 2)
2052 size = sf0_size + sf2_size
2053 self.failUnlessEqual(rec["actual-sharebytes"], size)
2054 self.failUnlessEqual(rec["original-sharebytes"], 0)
2055 self.failUnlessEqual(rec["configured-sharebytes"], size)
2056 # different platforms have different notions of "blocks used by
2057 # this file", so merely assert that it's a number
2058 self.failUnless(rec["actual-diskbytes"] >= 0,
2059 rec["actual-diskbytes"])
2060 self.failUnless(rec["original-diskbytes"] >= 0,
2061 rec["original-diskbytes"])
2062 self.failUnless(rec["configured-diskbytes"] >= 0,
2063 rec["configured-diskbytes"])
2064 d.addCallback(_after_first_cycle)
2065 d.addCallback(lambda ign: self.render1(webstatus))
2066 def _check_html(html):
2067 s = remove_tags(html)
2068 self.failUnlessIn("Expiration Enabled:"
2069 " expired leases will be removed", s)
2070 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2071 substr = "Leases created or last renewed before %s will be considered expired." % date
2072 self.failUnlessIn(substr, s)
2073 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2074 d.addCallback(_check_html)
2077 def test_only_immutable(self):
2078 basedir = "storage/LeaseCrawler/only_immutable"
2079 fileutil.make_dirs(basedir)
2081 then = int(now - 2000)
2082 ss = StorageServer(basedir, "\x00" * 20,
2083 expiration_enabled=True,
2084 expiration_mode="cutoff-date",
2085 expiration_cutoff_date=then,
2086 expiration_sharetypes=("immutable",))
2087 lc = ss.lease_checker
2089 webstatus = StorageStatus(ss)
2091 self.make_shares(ss)
2092 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2093 # set all leases to be expirable
2094 new_expiration_time = now - 3000 + 31*24*60*60
2096 def count_shares(si):
2097 return len(list(ss._iter_share_files(si)))
2098 def _get_sharefile(si):
2099 return list(ss._iter_share_files(si))[0]
2100 def count_leases(si):
2101 return len(list(_get_sharefile(si).get_leases()))
2103 sf0 = _get_sharefile(immutable_si_0)
2104 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2105 sf1 = _get_sharefile(immutable_si_1)
2106 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2107 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2108 sf2 = _get_sharefile(mutable_si_2)
2109 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2110 sf3 = _get_sharefile(mutable_si_3)
2111 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2112 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2114 ss.setServiceParent(self.s)
2116 return bool(lc.get_state()["last-cycle-finished"] is not None)
2117 d = self.poll(_wait)
2119 def _after_first_cycle(ignored):
2120 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2121 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2122 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2123 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2124 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2125 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2126 d.addCallback(_after_first_cycle)
2127 d.addCallback(lambda ign: self.render1(webstatus))
2128 def _check_html(html):
2129 s = remove_tags(html)
2130 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2131 d.addCallback(_check_html)
2134 def test_only_mutable(self):
2135 basedir = "storage/LeaseCrawler/only_mutable"
2136 fileutil.make_dirs(basedir)
2138 then = int(now - 2000)
2139 ss = StorageServer(basedir, "\x00" * 20,
2140 expiration_enabled=True,
2141 expiration_mode="cutoff-date",
2142 expiration_cutoff_date=then,
2143 expiration_sharetypes=("mutable",))
2144 lc = ss.lease_checker
2146 webstatus = StorageStatus(ss)
2148 self.make_shares(ss)
2149 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2150 # set all leases to be expirable
2151 new_expiration_time = now - 3000 + 31*24*60*60
2153 def count_shares(si):
2154 return len(list(ss._iter_share_files(si)))
2155 def _get_sharefile(si):
2156 return list(ss._iter_share_files(si))[0]
2157 def count_leases(si):
2158 return len(list(_get_sharefile(si).get_leases()))
2160 sf0 = _get_sharefile(immutable_si_0)
2161 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2162 sf1 = _get_sharefile(immutable_si_1)
2163 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2164 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2165 sf2 = _get_sharefile(mutable_si_2)
2166 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2167 sf3 = _get_sharefile(mutable_si_3)
2168 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2169 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2171 ss.setServiceParent(self.s)
2173 return bool(lc.get_state()["last-cycle-finished"] is not None)
2174 d = self.poll(_wait)
2176 def _after_first_cycle(ignored):
2177 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2178 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2179 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2180 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2181 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2182 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2183 d.addCallback(_after_first_cycle)
2184 d.addCallback(lambda ign: self.render1(webstatus))
2185 def _check_html(html):
2186 s = remove_tags(html)
2187 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2188 d.addCallback(_check_html)
2191 def test_bad_mode(self):
2192 basedir = "storage/LeaseCrawler/bad_mode"
2193 fileutil.make_dirs(basedir)
2194 e = self.failUnlessRaises(ValueError,
2195 StorageServer, basedir, "\x00" * 20,
2196 expiration_mode="bogus")
2197 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2199 def test_parse_duration(self):
2203 p = time_format.parse_duration
2204 self.failUnlessEqual(p("7days"), 7*DAY)
2205 self.failUnlessEqual(p("31day"), 31*DAY)
2206 self.failUnlessEqual(p("60 days"), 60*DAY)
2207 self.failUnlessEqual(p("2mo"), 2*MONTH)
2208 self.failUnlessEqual(p("3 month"), 3*MONTH)
2209 self.failUnlessEqual(p("2years"), 2*YEAR)
2210 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2211 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2213 def test_parse_date(self):
2214 p = time_format.parse_date
2215 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2216 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2218 def test_limited_history(self):
2219 basedir = "storage/LeaseCrawler/limited_history"
2220 fileutil.make_dirs(basedir)
2221 ss = StorageServer(basedir, "\x00" * 20)
2222 # make it start sooner than usual.
2223 lc = ss.lease_checker
2227 # create a few shares, with some leases on them
2228 self.make_shares(ss)
2230 ss.setServiceParent(self.s)
2232 def _wait_until_15_cycles_done():
2233 last = lc.state["last-cycle-finished"]
2234 if last is not None and last >= 15:
2239 d = self.poll(_wait_until_15_cycles_done)
2241 def _check(ignored):
2244 self.failUnlessEqual(len(h), 10)
2245 self.failUnlessEqual(max(h.keys()), 15)
2246 self.failUnlessEqual(min(h.keys()), 6)
2247 d.addCallback(_check)
2250 def test_unpredictable_future(self):
2251 basedir = "storage/LeaseCrawler/unpredictable_future"
2252 fileutil.make_dirs(basedir)
2253 ss = StorageServer(basedir, "\x00" * 20)
2254 # make it start sooner than usual.
2255 lc = ss.lease_checker
2257 lc.cpu_slice = -1.0 # stop quickly
2259 self.make_shares(ss)
2261 ss.setServiceParent(self.s)
2263 d = fireEventually()
2264 def _check(ignored):
2265 # this should fire after the first bucket is complete, but before
2266 # the first prefix is complete, so the progress-measurer won't
2267 # think we've gotten far enough to raise our percent-complete
2268 # above 0%, triggering the cannot-predict-the-future code in
2269 # expirer.py . This will have to change if/when the
2270 # progress-measurer gets smart enough to count buckets (we'll
2271 # have to interrupt it even earlier, before it's finished the
2274 if "cycle-to-date" not in s:
2275 d2 = fireEventually()
2276 d2.addCallback(_check)
2278 self.failUnlessIn("cycle-to-date", s)
2279 self.failUnlessIn("estimated-remaining-cycle", s)
2280 self.failUnlessIn("estimated-current-cycle", s)
2282 left = s["estimated-remaining-cycle"]["space-recovered"]
2283 self.failUnlessEqual(left["actual-buckets"], None)
2284 self.failUnlessEqual(left["original-buckets"], None)
2285 self.failUnlessEqual(left["configured-buckets"], None)
2286 self.failUnlessEqual(left["actual-shares"], None)
2287 self.failUnlessEqual(left["original-shares"], None)
2288 self.failUnlessEqual(left["configured-shares"], None)
2289 self.failUnlessEqual(left["actual-diskbytes"], None)
2290 self.failUnlessEqual(left["original-diskbytes"], None)
2291 self.failUnlessEqual(left["configured-diskbytes"], None)
2292 self.failUnlessEqual(left["actual-sharebytes"], None)
2293 self.failUnlessEqual(left["original-sharebytes"], None)
2294 self.failUnlessEqual(left["configured-sharebytes"], None)
2296 full = s["estimated-remaining-cycle"]["space-recovered"]
2297 self.failUnlessEqual(full["actual-buckets"], None)
2298 self.failUnlessEqual(full["original-buckets"], None)
2299 self.failUnlessEqual(full["configured-buckets"], None)
2300 self.failUnlessEqual(full["actual-shares"], None)
2301 self.failUnlessEqual(full["original-shares"], None)
2302 self.failUnlessEqual(full["configured-shares"], None)
2303 self.failUnlessEqual(full["actual-diskbytes"], None)
2304 self.failUnlessEqual(full["original-diskbytes"], None)
2305 self.failUnlessEqual(full["configured-diskbytes"], None)
2306 self.failUnlessEqual(full["actual-sharebytes"], None)
2307 self.failUnlessEqual(full["original-sharebytes"], None)
2308 self.failUnlessEqual(full["configured-sharebytes"], None)
2310 d.addCallback(_check)
2313 def test_no_st_blocks(self):
2314 basedir = "storage/LeaseCrawler/no_st_blocks"
2315 fileutil.make_dirs(basedir)
2316 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2317 expiration_mode="age",
2318 expiration_override_lease_duration=-1000)
2319 # a negative expiration_time= means the "configured-"
2320 # space-recovered counts will be non-zero, since all shares will have
2323 # make it start sooner than usual.
2324 lc = ss.lease_checker
2327 self.make_shares(ss)
2328 ss.setServiceParent(self.s)
2330 return bool(lc.get_state()["last-cycle-finished"] is not None)
2331 d = self.poll(_wait)
2333 def _check(ignored):
2335 last = s["history"][0]
2336 rec = last["space-recovered"]
2337 self.failUnlessEqual(rec["configured-buckets"], 4)
2338 self.failUnlessEqual(rec["configured-shares"], 4)
2339 self.failUnless(rec["configured-sharebytes"] > 0,
2340 rec["configured-sharebytes"])
2341 # without the .st_blocks field in os.stat() results, we should be
2342 # reporting diskbytes==sharebytes
2343 self.failUnlessEqual(rec["configured-sharebytes"],
2344 rec["configured-diskbytes"])
2345 d.addCallback(_check)
2348 def test_share_corruption(self):
2349 self._poll_should_ignore_these_errors = [
2350 UnknownMutableContainerVersionError,
2351 UnknownImmutableContainerVersionError,
2353 basedir = "storage/LeaseCrawler/share_corruption"
2354 fileutil.make_dirs(basedir)
2355 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2356 w = StorageStatus(ss)
2357 # make it start sooner than usual.
2358 lc = ss.lease_checker
2359 lc.stop_after_first_bucket = True
2363 # create a few shares, with some leases on them
2364 self.make_shares(ss)
2366 # now corrupt one, and make sure the lease-checker keeps going
2367 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2368 first = min(self.sis)
2369 first_b32 = base32.b2a(first)
2370 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2373 f.write("BAD MAGIC")
2375 # if get_share_file() doesn't see the correct mutable magic, it
2376 # assumes the file is an immutable share, and then
2377 # immutable.ShareFile sees a bad version. So regardless of which kind
2378 # of share we corrupted, this will trigger an
2379 # UnknownImmutableContainerVersionError.
2381 # also create an empty bucket
2382 empty_si = base32.b2a("\x04"*16)
2383 empty_bucket_dir = os.path.join(ss.sharedir,
2384 storage_index_to_dir(empty_si))
2385 fileutil.make_dirs(empty_bucket_dir)
2387 ss.setServiceParent(self.s)
2389 d = fireEventually()
2391 # now examine the state right after the first bucket has been
2393 def _after_first_bucket(ignored):
2395 if "cycle-to-date" not in s:
2396 d2 = fireEventually()
2397 d2.addCallback(_after_first_bucket)
2399 so_far = s["cycle-to-date"]
2400 rec = so_far["space-recovered"]
2401 self.failUnlessEqual(rec["examined-buckets"], 1)
2402 self.failUnlessEqual(rec["examined-shares"], 0)
2403 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2404 d.addCallback(_after_first_bucket)
2406 d.addCallback(lambda ign: self.render_json(w))
2407 def _check_json(json):
2408 data = simplejson.loads(json)
2409 # grr. json turns all dict keys into strings.
2410 so_far = data["lease-checker"]["cycle-to-date"]
2411 corrupt_shares = so_far["corrupt-shares"]
2412 # it also turns all tuples into lists
2413 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2414 d.addCallback(_check_json)
2415 d.addCallback(lambda ign: self.render1(w))
2416 def _check_html(html):
2417 s = remove_tags(html)
2418 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2419 d.addCallback(_check_html)
2422 return bool(lc.get_state()["last-cycle-finished"] is not None)
2423 d.addCallback(lambda ign: self.poll(_wait))
2425 def _after_first_cycle(ignored):
2427 last = s["history"][0]
2428 rec = last["space-recovered"]
2429 self.failUnlessEqual(rec["examined-buckets"], 5)
2430 self.failUnlessEqual(rec["examined-shares"], 3)
2431 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2432 d.addCallback(_after_first_cycle)
2433 d.addCallback(lambda ign: self.render_json(w))
2434 def _check_json_history(json):
2435 data = simplejson.loads(json)
2436 last = data["lease-checker"]["history"]["0"]
2437 corrupt_shares = last["corrupt-shares"]
2438 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2439 d.addCallback(_check_json_history)
2440 d.addCallback(lambda ign: self.render1(w))
2441 def _check_html_history(html):
2442 s = remove_tags(html)
2443 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2444 d.addCallback(_check_html_history)
2447 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2448 UnknownImmutableContainerVersionError)
2453 def render_json(self, page):
2454 d = self.render1(page, args={"t": ["json"]})
2457 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2460 self.s = service.MultiService()
2461 self.s.startService()
2463 return self.s.stopService()
2465 def test_no_server(self):
2466 w = StorageStatus(None)
2467 html = w.renderSynchronously()
2468 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2470 def test_status(self):
2471 basedir = "storage/WebStatus/status"
2472 fileutil.make_dirs(basedir)
2473 ss = StorageServer(basedir, "\x00" * 20)
2474 ss.setServiceParent(self.s)
2475 w = StorageStatus(ss)
2477 def _check_html(html):
2478 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2479 s = remove_tags(html)
2480 self.failUnlessIn("Accepting new shares: Yes", s)
2481 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2482 d.addCallback(_check_html)
2483 d.addCallback(lambda ign: self.render_json(w))
2484 def _check_json(json):
2485 data = simplejson.loads(json)
2487 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2488 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2489 self.failUnlessIn("bucket-counter", data)
2490 self.failUnlessIn("lease-checker", data)
2491 d.addCallback(_check_json)
2494 def render_json(self, page):
2495 d = self.render1(page, args={"t": ["json"]})
2498 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2499 def test_status_no_disk_stats(self, mock_get_disk_stats):
2500 mock_get_disk_stats.side_effect = AttributeError()
2502 # Some platforms may have no disk stats API. Make sure the code can handle that
2503 # (test runs on all platforms).
2504 basedir = "storage/WebStatus/status_no_disk_stats"
2505 fileutil.make_dirs(basedir)
2506 ss = StorageServer(basedir, "\x00" * 20)
2507 ss.setServiceParent(self.s)
2508 w = StorageStatus(ss)
2509 html = w.renderSynchronously()
2510 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2511 s = remove_tags(html)
2512 self.failUnlessIn("Accepting new shares: Yes", s)
2513 self.failUnlessIn("Total disk space: ?", s)
2514 self.failUnlessIn("Space Available to Tahoe: ?", s)
2515 self.failUnless(ss.get_available_space() is None)
2517 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2518 def test_status_bad_disk_stats(self, mock_get_disk_stats):
2519 mock_get_disk_stats.side_effect = OSError()
2521 # If the API to get disk stats exists but a call to it fails, then the status should
2522 # show that no shares will be accepted, and get_available_space() should be 0.
2523 basedir = "storage/WebStatus/status_bad_disk_stats"
2524 fileutil.make_dirs(basedir)
2525 ss = StorageServer(basedir, "\x00" * 20)
2526 ss.setServiceParent(self.s)
2527 w = StorageStatus(ss)
2528 html = w.renderSynchronously()
2529 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2530 s = remove_tags(html)
2531 self.failUnlessIn("Accepting new shares: No", s)
2532 self.failUnlessIn("Total disk space: ?", s)
2533 self.failUnlessIn("Space Available to Tahoe: ?", s)
2534 self.failUnlessEqual(ss.get_available_space(), 0)
2536 def test_readonly(self):
2537 basedir = "storage/WebStatus/readonly"
2538 fileutil.make_dirs(basedir)
2539 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2540 ss.setServiceParent(self.s)
2541 w = StorageStatus(ss)
2542 html = w.renderSynchronously()
2543 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2544 s = remove_tags(html)
2545 self.failUnlessIn("Accepting new shares: No", s)
2547 def test_reserved(self):
2548 basedir = "storage/WebStatus/reserved"
2549 fileutil.make_dirs(basedir)
2550 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2551 ss.setServiceParent(self.s)
2552 w = StorageStatus(ss)
2553 html = w.renderSynchronously()
2554 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2555 s = remove_tags(html)
2556 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2558 def test_huge_reserved(self):
2559 basedir = "storage/WebStatus/reserved"
2560 fileutil.make_dirs(basedir)
2561 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2562 ss.setServiceParent(self.s)
2563 w = StorageStatus(ss)
2564 html = w.renderSynchronously()
2565 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2566 s = remove_tags(html)
2567 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2569 def test_util(self):
2570 w = StorageStatus(None)
2571 self.failUnlessEqual(w.render_space(None, None), "?")
2572 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2573 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2574 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2575 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2576 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)