1 import time, os.path, platform, stat, re, simplejson, struct
3 from allmydata.util import log
7 from twisted.trial import unittest
9 from twisted.internet import defer
10 from twisted.application import service
11 from foolscap.api import fireEventually
13 from allmydata import interfaces
14 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
15 from allmydata.storage.server import StorageServer
16 from allmydata.storage.mutable import MutableShareFile
17 from allmydata.storage.immutable import BucketWriter, BucketReader
18 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
19 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
20 from allmydata.storage.lease import LeaseInfo
21 from allmydata.storage.crawler import BucketCountingCrawler
22 from allmydata.storage.expirer import LeaseCheckingCrawler
23 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
25 from allmydata.interfaces import BadWriteEnablerError
26 from allmydata.test.common import LoggingServiceParent
27 from allmydata.test.common_web import WebRenderingMixin
28 from allmydata.web.storage import StorageStatus, remove_prefix
33 def __init__(self, ignore_disconnectors=False):
34 self.ignore = ignore_disconnectors
35 self.disconnectors = {}
36 def notifyOnDisconnect(self, f, *args, **kwargs):
40 self.disconnectors[m] = (f, args, kwargs)
42 def dontNotifyOnDisconnect(self, marker):
45 del self.disconnectors[marker]
47 class FakeStatsProvider:
48 def count(self, name, delta=1):
50 def register_producer(self, producer):
53 class Bucket(unittest.TestCase):
54 def make_workdir(self, name):
55 basedir = os.path.join("storage", "Bucket", name)
56 incoming = os.path.join(basedir, "tmp", "bucket")
57 final = os.path.join(basedir, "bucket")
58 fileutil.make_dirs(basedir)
59 fileutil.make_dirs(os.path.join(basedir, "tmp"))
60 return incoming, final
62 def bucket_writer_closed(self, bw, consumed):
64 def add_latency(self, category, latency):
66 def count(self, name, delta=1):
71 renew_secret = os.urandom(32)
72 cancel_secret = os.urandom(32)
73 expiration_time = time.time() + 5000
74 return LeaseInfo(owner_num, renew_secret, cancel_secret,
75 expiration_time, "\x00" * 20)
77 def test_create(self):
78 incoming, final = self.make_workdir("test_create")
79 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
81 bw.remote_write(0, "a"*25)
82 bw.remote_write(25, "b"*25)
83 bw.remote_write(50, "c"*25)
84 bw.remote_write(75, "d"*7)
87 def test_readwrite(self):
88 incoming, final = self.make_workdir("test_readwrite")
89 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
91 bw.remote_write(0, "a"*25)
92 bw.remote_write(25, "b"*25)
93 bw.remote_write(50, "c"*7) # last block may be short
97 br = BucketReader(self, bw.finalhome)
98 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
99 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
100 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
104 def callRemote(self, methname, *args, **kwargs):
106 meth = getattr(self.target, "remote_" + methname)
107 return meth(*args, **kwargs)
108 return defer.maybeDeferred(_call)
110 class BucketProxy(unittest.TestCase):
111 def make_bucket(self, name, size):
112 basedir = os.path.join("storage", "BucketProxy", name)
113 incoming = os.path.join(basedir, "tmp", "bucket")
114 final = os.path.join(basedir, "bucket")
115 fileutil.make_dirs(basedir)
116 fileutil.make_dirs(os.path.join(basedir, "tmp"))
117 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
123 def make_lease(self):
125 renew_secret = os.urandom(32)
126 cancel_secret = os.urandom(32)
127 expiration_time = time.time() + 5000
128 return LeaseInfo(owner_num, renew_secret, cancel_secret,
129 expiration_time, "\x00" * 20)
131 def bucket_writer_closed(self, bw, consumed):
133 def add_latency(self, category, latency):
135 def count(self, name, delta=1):
138 def test_create(self):
139 bw, rb, sharefname = self.make_bucket("test_create", 500)
140 bp = WriteBucketProxy(rb,
145 uri_extension_size_max=500, nodeid=None)
146 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
148 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
149 # Let's pretend each share has 100 bytes of data, and that there are
150 # 4 segments (25 bytes each), and 8 shares total. So the two
151 # per-segment merkle trees (crypttext_hash_tree,
152 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
153 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
154 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
155 # long. That should make the whole share:
157 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
158 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
160 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
162 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
164 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
166 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
168 uri_extension = "s" + "E"*498 + "e"
170 bw, rb, sharefname = self.make_bucket(name, sharesize)
176 uri_extension_size_max=len(uri_extension),
180 d.addCallback(lambda res: bp.put_block(0, "a"*25))
181 d.addCallback(lambda res: bp.put_block(1, "b"*25))
182 d.addCallback(lambda res: bp.put_block(2, "c"*25))
183 d.addCallback(lambda res: bp.put_block(3, "d"*20))
184 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
185 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
186 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
187 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
188 d.addCallback(lambda res: bp.close())
190 # now read everything back
191 def _start_reading(res):
192 br = BucketReader(self, sharefname)
195 rbp = rbp_class(rb, peerid="abc", storage_index="")
196 self.failUnlessIn("to peer", repr(rbp))
197 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
199 d1 = rbp.get_block_data(0, 25, 25)
200 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
201 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
202 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
203 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
204 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
205 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
206 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
208 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
209 d1.addCallback(lambda res:
210 self.failUnlessEqual(res, crypttext_hashes))
211 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
212 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
213 d1.addCallback(lambda res: rbp.get_share_hashes())
214 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
215 d1.addCallback(lambda res: rbp.get_uri_extension())
216 d1.addCallback(lambda res:
217 self.failUnlessEqual(res, uri_extension))
221 d.addCallback(_start_reading)
225 def test_readwrite_v1(self):
226 return self._do_test_readwrite("test_readwrite_v1",
227 0x24, WriteBucketProxy, ReadBucketProxy)
229 def test_readwrite_v2(self):
230 return self._do_test_readwrite("test_readwrite_v2",
231 0x44, WriteBucketProxy_v2, ReadBucketProxy)
233 class Server(unittest.TestCase):
236 self.sparent = LoggingServiceParent()
237 self.sparent.startService()
238 self._lease_secret = itertools.count()
240 return self.sparent.stopService()
242 def workdir(self, name):
243 basedir = os.path.join("storage", "Server", name)
246 def create(self, name, reserved_space=0, klass=StorageServer):
247 workdir = self.workdir(name)
248 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
249 stats_provider=FakeStatsProvider())
250 ss.setServiceParent(self.sparent)
253 def test_create(self):
254 self.create("test_create")
256 def allocate(self, ss, storage_index, sharenums, size, canary=None):
257 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
258 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260 canary = FakeCanary()
261 return ss.remote_allocate_buckets(storage_index,
262 renew_secret, cancel_secret,
263 sharenums, size, canary)
265 def test_large_share(self):
266 syslow = platform.system().lower()
267 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
268 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
270 avail = fileutil.get_available_space('.', 2**14)
272 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
274 ss = self.create("test_large_share")
276 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
277 self.failUnlessEqual(already, set())
278 self.failUnlessEqual(set(writers.keys()), set([0]))
280 shnum, bucket = writers.items()[0]
281 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
282 bucket.remote_write(2**32, "ab")
283 bucket.remote_close()
285 readers = ss.remote_get_buckets("allocate")
286 reader = readers[shnum]
287 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
289 def test_dont_overfill_dirs(self):
291 This test asserts that if you add a second share whose storage index
292 share lots of leading bits with an extant share (but isn't the exact
293 same storage index), this won't add an entry to the share directory.
295 ss = self.create("test_dont_overfill_dirs")
296 already, writers = self.allocate(ss, "storageindex", [0], 10)
297 for i, wb in writers.items():
298 wb.remote_write(0, "%10d" % i)
300 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
302 children_of_storedir = set(os.listdir(storedir))
304 # Now store another one under another storageindex that has leading
305 # chars the same as the first storageindex.
306 already, writers = self.allocate(ss, "storageindey", [0], 10)
307 for i, wb in writers.items():
308 wb.remote_write(0, "%10d" % i)
310 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
312 new_children_of_storedir = set(os.listdir(storedir))
313 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
315 def test_remove_incoming(self):
316 ss = self.create("test_remove_incoming")
317 already, writers = self.allocate(ss, "vid", range(3), 10)
318 for i,wb in writers.items():
319 wb.remote_write(0, "%10d" % i)
321 incoming_share_dir = wb.incominghome
322 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
323 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
324 incoming_dir = os.path.dirname(incoming_prefix_dir)
325 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
326 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
327 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
329 def test_abort(self):
330 # remote_abort, when called on a writer, should make sure that
331 # the allocated size of the bucket is not counted by the storage
332 # server when accounting for space.
333 ss = self.create("test_abort")
334 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
335 self.failIfEqual(ss.allocated_size(), 0)
337 # Now abort the writers.
338 for writer in writers.itervalues():
339 writer.remote_abort()
340 self.failUnlessEqual(ss.allocated_size(), 0)
343 def test_allocate(self):
344 ss = self.create("test_allocate")
346 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
348 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
349 self.failUnlessEqual(already, set())
350 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
352 # while the buckets are open, they should not count as readable
353 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
356 for i,wb in writers.items():
357 wb.remote_write(0, "%25d" % i)
359 # aborting a bucket that was already closed is a no-op
362 # now they should be readable
363 b = ss.remote_get_buckets("allocate")
364 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
365 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
367 self.failUnlessIn("BucketReader", b_str)
368 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
370 # now if we ask about writing again, the server should offer those
371 # three buckets as already present. It should offer them even if we
372 # don't ask about those specific ones.
373 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
374 self.failUnlessEqual(already, set([0,1,2]))
375 self.failUnlessEqual(set(writers.keys()), set([3,4]))
377 # while those two buckets are open for writing, the server should
378 # refuse to offer them to uploaders
380 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
381 self.failUnlessEqual(already2, set([0,1,2]))
382 self.failUnlessEqual(set(writers2.keys()), set([5]))
384 # aborting the writes should remove the tempfiles
385 for i,wb in writers2.items():
387 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
388 self.failUnlessEqual(already2, set([0,1,2]))
389 self.failUnlessEqual(set(writers2.keys()), set([5]))
391 for i,wb in writers2.items():
393 for i,wb in writers.items():
396 def test_bad_container_version(self):
397 ss = self.create("test_bad_container_version")
398 a,w = self.allocate(ss, "si1", [0], 10)
399 w[0].remote_write(0, "\xff"*10)
402 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
405 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
408 ss.remote_get_buckets("allocate")
410 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
411 ss.remote_get_buckets, "si1")
412 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
414 def test_disconnect(self):
415 # simulate a disconnection
416 ss = self.create("test_disconnect")
417 canary = FakeCanary()
418 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
419 self.failUnlessEqual(already, set())
420 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
421 for (f,args,kwargs) in canary.disconnectors.values():
426 # that ought to delete the incoming shares
427 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
428 self.failUnlessEqual(already, set())
429 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
431 @mock.patch('allmydata.util.fileutil.get_disk_stats')
432 def test_reserved_space(self, mock_get_disk_stats):
434 mock_get_disk_stats.return_value = {
435 'free_for_nonroot': 15000,
436 'avail': max(15000 - reserved_space, 0),
439 ss = self.create("test_reserved_space", reserved_space=reserved_space)
440 # 15k available, 10k reserved, leaves 5k for shares
442 # a newly created and filled share incurs this much overhead, beyond
443 # the size we request.
445 LEASE_SIZE = 4+32+32+4
446 canary = FakeCanary(True)
447 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
448 self.failUnlessEqual(len(writers), 3)
449 # now the StorageServer should have 3000 bytes provisionally
450 # allocated, allowing only 2000 more to be claimed
451 self.failUnlessEqual(len(ss._active_writers), 3)
453 # allocating 1001-byte shares only leaves room for one
454 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
455 self.failUnlessEqual(len(writers2), 1)
456 self.failUnlessEqual(len(ss._active_writers), 4)
458 # we abandon the first set, so their provisional allocation should be
462 self.failUnlessEqual(len(ss._active_writers), 1)
463 # now we have a provisional allocation of 1001 bytes
465 # and we close the second set, so their provisional allocation should
466 # become real, long-term allocation, and grows to include the
468 for bw in writers2.values():
469 bw.remote_write(0, "a"*25)
474 self.failUnlessEqual(len(ss._active_writers), 0)
476 allocated = 1001 + OVERHEAD + LEASE_SIZE
478 # we have to manually increase available, since we're not doing real
480 mock_get_disk_stats.return_value = {
481 'free_for_nonroot': 15000 - allocated,
482 'avail': max(15000 - allocated - reserved_space, 0),
485 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
486 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
487 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
488 self.failUnlessEqual(len(writers3), 39)
489 self.failUnlessEqual(len(ss._active_writers), 39)
493 self.failUnlessEqual(len(ss._active_writers), 0)
494 ss.disownServiceParent()
498 basedir = self.workdir("test_seek_behavior")
499 fileutil.make_dirs(basedir)
500 filename = os.path.join(basedir, "testfile")
501 f = open(filename, "wb")
504 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
505 # files. mode="a" preserves previous contents but does not allow
506 # seeking-to-create-holes. mode="r+" allows both.
507 f = open(filename, "rb+")
511 filelen = os.stat(filename)[stat.ST_SIZE]
512 self.failUnlessEqual(filelen, 100+3)
513 f2 = open(filename, "rb")
514 self.failUnlessEqual(f2.read(5), "start")
517 def test_leases(self):
518 ss = self.create("test_leases")
519 canary = FakeCanary()
523 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
524 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
525 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
526 sharenums, size, canary)
527 self.failUnlessEqual(len(already), 0)
528 self.failUnlessEqual(len(writers), 5)
529 for wb in writers.values():
532 leases = list(ss.get_leases("si0"))
533 self.failUnlessEqual(len(leases), 1)
534 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
536 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
537 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
538 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
539 sharenums, size, canary)
540 for wb in writers.values():
543 # take out a second lease on si1
544 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
545 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
546 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
547 sharenums, size, canary)
548 self.failUnlessEqual(len(already), 5)
549 self.failUnlessEqual(len(writers), 0)
551 leases = list(ss.get_leases("si1"))
552 self.failUnlessEqual(len(leases), 2)
553 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
555 # and a third lease, using add-lease
556 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
557 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
558 ss.remote_add_lease("si1", rs2a, cs2a)
559 leases = list(ss.get_leases("si1"))
560 self.failUnlessEqual(len(leases), 3)
561 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
563 # add-lease on a missing storage index is silently ignored
564 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
566 # check that si0 is readable
567 readers = ss.remote_get_buckets("si0")
568 self.failUnlessEqual(len(readers), 5)
570 # renew the first lease. Only the proper renew_secret should work
571 ss.remote_renew_lease("si0", rs0)
572 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
573 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
575 # check that si0 is still readable
576 readers = ss.remote_get_buckets("si0")
577 self.failUnlessEqual(len(readers), 5)
580 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
581 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
582 ss.remote_cancel_lease("si0", cs0)
584 # si0 should now be gone
585 readers = ss.remote_get_buckets("si0")
586 self.failUnlessEqual(len(readers), 0)
587 # and the renew should no longer work
588 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
591 # cancel the first lease on si1, leaving the second and third in place
592 ss.remote_cancel_lease("si1", cs1)
593 readers = ss.remote_get_buckets("si1")
594 self.failUnlessEqual(len(readers), 5)
595 # the corresponding renew should no longer work
596 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
598 leases = list(ss.get_leases("si1"))
599 self.failUnlessEqual(len(leases), 2)
600 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
602 ss.remote_renew_lease("si1", rs2)
603 # cancelling the second and third should make it go away
604 ss.remote_cancel_lease("si1", cs2)
605 ss.remote_cancel_lease("si1", cs2a)
606 readers = ss.remote_get_buckets("si1")
607 self.failUnlessEqual(len(readers), 0)
608 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
609 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
610 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
612 leases = list(ss.get_leases("si1"))
613 self.failUnlessEqual(len(leases), 0)
616 # test overlapping uploads
617 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
618 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
619 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
620 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
621 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
622 sharenums, size, canary)
623 self.failUnlessEqual(len(already), 0)
624 self.failUnlessEqual(len(writers), 5)
625 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
626 sharenums, size, canary)
627 self.failUnlessEqual(len(already2), 0)
628 self.failUnlessEqual(len(writers2), 0)
629 for wb in writers.values():
632 leases = list(ss.get_leases("si3"))
633 self.failUnlessEqual(len(leases), 1)
635 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
636 sharenums, size, canary)
637 self.failUnlessEqual(len(already3), 5)
638 self.failUnlessEqual(len(writers3), 0)
640 leases = list(ss.get_leases("si3"))
641 self.failUnlessEqual(len(leases), 2)
643 def test_readonly(self):
644 workdir = self.workdir("test_readonly")
645 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
646 ss.setServiceParent(self.sparent)
648 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
649 self.failUnlessEqual(already, set())
650 self.failUnlessEqual(writers, {})
652 stats = ss.get_stats()
653 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
654 if "storage_server.disk_avail" in stats:
655 # Some platforms may not have an API to get disk stats.
656 # But if there are stats, readonly_storage means disk_avail=0
657 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
659 def test_discard(self):
660 # discard is really only used for other tests, but we test it anyways
661 workdir = self.workdir("test_discard")
662 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
663 ss.setServiceParent(self.sparent)
665 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
666 self.failUnlessEqual(already, set())
667 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
668 for i,wb in writers.items():
669 wb.remote_write(0, "%25d" % i)
671 # since we discard the data, the shares should be present but sparse.
672 # Since we write with some seeks, the data we read back will be all
674 b = ss.remote_get_buckets("vid")
675 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
676 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
678 def test_advise_corruption(self):
679 workdir = self.workdir("test_advise_corruption")
680 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
681 ss.setServiceParent(self.sparent)
683 si0_s = base32.b2a("si0")
684 ss.remote_advise_corrupt_share("immutable", "si0", 0,
685 "This share smells funny.\n")
686 reportdir = os.path.join(workdir, "corruption-advisories")
687 reports = os.listdir(reportdir)
688 self.failUnlessEqual(len(reports), 1)
689 report_si0 = reports[0]
690 self.failUnlessIn(si0_s, report_si0)
691 f = open(os.path.join(reportdir, report_si0), "r")
694 self.failUnlessIn("type: immutable", report)
695 self.failUnlessIn("storage_index: %s" % si0_s, report)
696 self.failUnlessIn("share_number: 0", report)
697 self.failUnlessIn("This share smells funny.", report)
699 # test the RIBucketWriter version too
700 si1_s = base32.b2a("si1")
701 already,writers = self.allocate(ss, "si1", [1], 75)
702 self.failUnlessEqual(already, set())
703 self.failUnlessEqual(set(writers.keys()), set([1]))
704 writers[1].remote_write(0, "data")
705 writers[1].remote_close()
707 b = ss.remote_get_buckets("si1")
708 self.failUnlessEqual(set(b.keys()), set([1]))
709 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
711 reports = os.listdir(reportdir)
712 self.failUnlessEqual(len(reports), 2)
713 report_si1 = [r for r in reports if si1_s in r][0]
714 f = open(os.path.join(reportdir, report_si1), "r")
717 self.failUnlessIn("type: immutable", report)
718 self.failUnlessIn("storage_index: %s" % si1_s, report)
719 self.failUnlessIn("share_number: 1", report)
720 self.failUnlessIn("This share tastes like dust.", report)
724 class MutableServer(unittest.TestCase):
727 self.sparent = LoggingServiceParent()
728 self._lease_secret = itertools.count()
730 return self.sparent.stopService()
732 def workdir(self, name):
733 basedir = os.path.join("storage", "MutableServer", name)
736 def create(self, name):
737 workdir = self.workdir(name)
738 ss = StorageServer(workdir, "\x00" * 20)
739 ss.setServiceParent(self.sparent)
742 def test_create(self):
743 self.create("test_create")
745 def write_enabler(self, we_tag):
746 return hashutil.tagged_hash("we_blah", we_tag)
748 def renew_secret(self, tag):
749 return hashutil.tagged_hash("renew_blah", str(tag))
751 def cancel_secret(self, tag):
752 return hashutil.tagged_hash("cancel_blah", str(tag))
754 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
755 write_enabler = self.write_enabler(we_tag)
756 renew_secret = self.renew_secret(lease_tag)
757 cancel_secret = self.cancel_secret(lease_tag)
758 rstaraw = ss.remote_slot_testv_and_readv_and_writev
759 testandwritev = dict( [ (shnum, ([], [], None) )
760 for shnum in sharenums ] )
762 rc = rstaraw(storage_index,
763 (write_enabler, renew_secret, cancel_secret),
766 (did_write, readv_data) = rc
767 self.failUnless(did_write)
768 self.failUnless(isinstance(readv_data, dict))
769 self.failUnlessEqual(len(readv_data), 0)
771 def test_bad_magic(self):
772 ss = self.create("test_bad_magic")
773 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
774 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
779 read = ss.remote_slot_readv
780 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
781 read, "si1", [0], [(0,10)])
782 self.failUnlessIn(" had magic ", str(e))
783 self.failUnlessIn(" but we wanted ", str(e))
785 def test_container_size(self):
786 ss = self.create("test_container_size")
787 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
789 read = ss.remote_slot_readv
790 rstaraw = ss.remote_slot_testv_and_readv_and_writev
791 secrets = ( self.write_enabler("we1"),
792 self.renew_secret("we1"),
793 self.cancel_secret("we1") )
794 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
795 answer = rstaraw("si1", secrets,
796 {0: ([], [(0,data)], len(data)+12)},
798 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
800 # trying to make the container too large will raise an exception
801 TOOBIG = MutableShareFile.MAX_SIZE + 10
802 self.failUnlessRaises(DataTooLargeError,
803 rstaraw, "si1", secrets,
804 {0: ([], [(0,data)], TOOBIG)},
807 # it should be possible to make the container smaller, although at
808 # the moment this doesn't actually affect the share, unless the
809 # container size is dropped to zero, in which case the share is
811 answer = rstaraw("si1", secrets,
812 {0: ([], [(0,data)], len(data)+8)},
814 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
816 answer = rstaraw("si1", secrets,
817 {0: ([], [(0,data)], 0)},
819 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
821 read_answer = read("si1", [0], [(0,10)])
822 self.failUnlessEqual(read_answer, {})
824 def test_allocate(self):
825 ss = self.create("test_allocate")
826 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
829 read = ss.remote_slot_readv
830 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
832 self.failUnlessEqual(read("si1", [], [(0, 10)]),
833 {0: [""], 1: [""], 2: [""]})
834 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
838 secrets = ( self.write_enabler("we1"),
839 self.renew_secret("we1"),
840 self.cancel_secret("we1") )
841 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
842 write = ss.remote_slot_testv_and_readv_and_writev
843 answer = write("si1", secrets,
844 {0: ([], [(0,data)], None)},
846 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
848 self.failUnlessEqual(read("si1", [0], [(0,20)]),
849 {0: ["00000000001111111111"]})
850 self.failUnlessEqual(read("si1", [0], [(95,10)]),
852 #self.failUnlessEqual(s0.remote_get_length(), 100)
854 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
855 f = self.failUnlessRaises(BadWriteEnablerError,
856 write, "si1", bad_secrets,
858 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
860 # this testv should fail
861 answer = write("si1", secrets,
862 {0: ([(0, 12, "eq", "444444444444"),
863 (20, 5, "eq", "22222"),
870 self.failUnlessEqual(answer, (False,
871 {0: ["000000000011", "22222"],
875 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
878 answer = write("si1", secrets,
879 {0: ([(10, 5, "lt", "11111"),
886 self.failUnlessEqual(answer, (False,
891 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
894 def test_operators(self):
895 # test operators, the data we're comparing is '11111' in all cases.
896 # test both fail+pass, reset data after each one.
897 ss = self.create("test_operators")
899 secrets = ( self.write_enabler("we1"),
900 self.renew_secret("we1"),
901 self.cancel_secret("we1") )
902 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
903 write = ss.remote_slot_testv_and_readv_and_writev
904 read = ss.remote_slot_readv
907 write("si1", secrets,
908 {0: ([], [(0,data)], None)},
914 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
919 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
920 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
921 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
924 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
929 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
930 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
933 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
938 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
939 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
943 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
948 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
949 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
952 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
957 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
961 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
966 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
967 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
971 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
976 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
977 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
980 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
985 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
986 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
990 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
995 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
996 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
999 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1004 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1005 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1009 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1014 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1018 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1023 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1024 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1027 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1032 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1033 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1037 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1042 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1043 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1046 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1051 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1055 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1060 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1061 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1064 # finally, test some operators against empty shares
1065 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1070 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1071 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1074 def test_readv(self):
1075 ss = self.create("test_readv")
1076 secrets = ( self.write_enabler("we1"),
1077 self.renew_secret("we1"),
1078 self.cancel_secret("we1") )
1079 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1080 write = ss.remote_slot_testv_and_readv_and_writev
1081 read = ss.remote_slot_readv
1082 data = [("%d" % i) * 100 for i in range(3)]
1083 rc = write("si1", secrets,
1084 {0: ([], [(0,data[0])], None),
1085 1: ([], [(0,data[1])], None),
1086 2: ([], [(0,data[2])], None),
1088 self.failUnlessEqual(rc, (True, {}))
1090 answer = read("si1", [], [(0, 10)])
1091 self.failUnlessEqual(answer, {0: ["0"*10],
1095 def compare_leases_without_timestamps(self, leases_a, leases_b):
1096 self.failUnlessEqual(len(leases_a), len(leases_b))
1097 for i in range(len(leases_a)):
1100 self.failUnlessEqual(a.owner_num, b.owner_num)
1101 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1102 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1103 self.failUnlessEqual(a.nodeid, b.nodeid)
1105 def compare_leases(self, leases_a, leases_b):
1106 self.failUnlessEqual(len(leases_a), len(leases_b))
1107 for i in range(len(leases_a)):
1110 self.failUnlessEqual(a.owner_num, b.owner_num)
1111 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1112 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1113 self.failUnlessEqual(a.nodeid, b.nodeid)
1114 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1116 def test_leases(self):
1117 ss = self.create("test_leases")
1119 return ( self.write_enabler("we1"),
1120 self.renew_secret("we1-%d" % n),
1121 self.cancel_secret("we1-%d" % n) )
1122 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1123 write = ss.remote_slot_testv_and_readv_and_writev
1124 read = ss.remote_slot_readv
1125 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1126 self.failUnlessEqual(rc, (True, {}))
1128 # create a random non-numeric file in the bucket directory, to
1129 # exercise the code that's supposed to ignore those.
1130 bucket_dir = os.path.join(self.workdir("test_leases"),
1131 "shares", storage_index_to_dir("si1"))
1132 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1133 f.write("you ought to be ignoring me\n")
1136 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1137 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1139 # add-lease on a missing storage index is silently ignored
1140 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1142 # re-allocate the slots and use the same secrets, that should update
1144 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1145 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1148 ss.remote_renew_lease("si1", secrets(0)[1])
1149 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1151 # now allocate them with a bunch of different secrets, to trigger the
1152 # extended lease code. Use add_lease for one of them.
1153 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1154 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1155 secrets2 = secrets(2)
1156 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1157 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1158 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1159 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1160 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1162 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1164 # cancel one of them
1165 ss.remote_cancel_lease("si1", secrets(5)[2])
1166 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1168 all_leases = list(s0.get_leases())
1169 # and write enough data to expand the container, forcing the server
1170 # to move the leases
1171 write("si1", secrets(0),
1172 {0: ([], [(0,data)], 200), },
1175 # read back the leases, make sure they're still intact.
1176 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1178 ss.remote_renew_lease("si1", secrets(0)[1])
1179 ss.remote_renew_lease("si1", secrets(1)[1])
1180 ss.remote_renew_lease("si1", secrets(2)[1])
1181 ss.remote_renew_lease("si1", secrets(3)[1])
1182 ss.remote_renew_lease("si1", secrets(4)[1])
1183 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1184 # get a new copy of the leases, with the current timestamps. Reading
1185 # data and failing to renew/cancel leases should leave the timestamps
1187 all_leases = list(s0.get_leases())
1188 # renewing with a bogus token should prompt an error message
1190 # examine the exception thus raised, make sure the old nodeid is
1191 # present, to provide for share migration
1192 e = self.failUnlessRaises(IndexError,
1193 ss.remote_renew_lease, "si1",
1196 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1197 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1198 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1200 # same for cancelling
1201 self.failUnlessRaises(IndexError,
1202 ss.remote_cancel_lease, "si1",
1204 self.compare_leases(all_leases, list(s0.get_leases()))
1206 # reading shares should not modify the timestamp
1207 read("si1", [], [(0,200)])
1208 self.compare_leases(all_leases, list(s0.get_leases()))
1210 write("si1", secrets(0),
1211 {0: ([], [(200, "make me bigger")], None)}, [])
1212 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1214 write("si1", secrets(0),
1215 {0: ([], [(500, "make me really bigger")], None)}, [])
1216 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1218 # now cancel them all
1219 ss.remote_cancel_lease("si1", secrets(0)[2])
1220 ss.remote_cancel_lease("si1", secrets(1)[2])
1221 ss.remote_cancel_lease("si1", secrets(2)[2])
1222 ss.remote_cancel_lease("si1", secrets(3)[2])
1224 # the slot should still be there
1225 remaining_shares = read("si1", [], [(0,10)])
1226 self.failUnlessEqual(len(remaining_shares), 1)
1227 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1229 # cancelling a non-existent lease should raise an IndexError
1230 self.failUnlessRaises(IndexError,
1231 ss.remote_cancel_lease, "si1", "nonsecret")
1233 # and the slot should still be there
1234 remaining_shares = read("si1", [], [(0,10)])
1235 self.failUnlessEqual(len(remaining_shares), 1)
1236 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1238 ss.remote_cancel_lease("si1", secrets(4)[2])
1239 # now the slot should be gone
1240 no_shares = read("si1", [], [(0,10)])
1241 self.failUnlessEqual(no_shares, {})
1243 # cancelling a lease on a non-existent share should raise an IndexError
1244 self.failUnlessRaises(IndexError,
1245 ss.remote_cancel_lease, "si2", "nonsecret")
1247 def test_remove(self):
1248 ss = self.create("test_remove")
1249 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1251 readv = ss.remote_slot_readv
1252 writev = ss.remote_slot_testv_and_readv_and_writev
1253 secrets = ( self.write_enabler("we1"),
1254 self.renew_secret("we1"),
1255 self.cancel_secret("we1") )
1256 # delete sh0 by setting its size to zero
1257 answer = writev("si1", secrets,
1260 # the answer should mention all the shares that existed before the
1262 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1263 # but a new read should show only sh1 and sh2
1264 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1267 # delete sh1 by setting its size to zero
1268 answer = writev("si1", secrets,
1271 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1272 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1275 # delete sh2 by setting its size to zero
1276 answer = writev("si1", secrets,
1279 self.failUnlessEqual(answer, (True, {2:[]}) )
1280 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1282 # and the bucket directory should now be gone
1283 si = base32.b2a("si1")
1284 # note: this is a detail of the storage server implementation, and
1285 # may change in the future
1287 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1288 bucketdir = os.path.join(prefixdir, si)
1289 self.failUnless(os.path.exists(prefixdir), prefixdir)
1290 self.failIf(os.path.exists(bucketdir), bucketdir)
1292 class Stats(unittest.TestCase):
1295 self.sparent = LoggingServiceParent()
1296 self._lease_secret = itertools.count()
1298 return self.sparent.stopService()
1300 def workdir(self, name):
1301 basedir = os.path.join("storage", "Server", name)
1304 def create(self, name):
1305 workdir = self.workdir(name)
1306 ss = StorageServer(workdir, "\x00" * 20)
1307 ss.setServiceParent(self.sparent)
1310 def test_latencies(self):
1311 ss = self.create("test_latencies")
1312 for i in range(10000):
1313 ss.add_latency("allocate", 1.0 * i)
1314 for i in range(1000):
1315 ss.add_latency("renew", 1.0 * i)
1317 ss.add_latency("cancel", 2.0 * i)
1318 ss.add_latency("get", 5.0)
1320 output = ss.get_latencies()
1322 self.failUnlessEqual(sorted(output.keys()),
1323 sorted(["allocate", "renew", "cancel", "get"]))
1324 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1325 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1326 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1327 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1328 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1329 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1330 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1331 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1332 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1334 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1335 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1336 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
1337 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1338 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1339 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1340 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1341 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1342 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1344 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1345 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1346 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1, output)
1347 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
1348 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1349 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1350 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1351 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1352 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1354 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1355 self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1356 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1357 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1358 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1359 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1360 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1361 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1362 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1365 s = re.sub(r'<[^>]*>', ' ', s)
1366 s = re.sub(r'\s+', ' ', s)
1369 class MyBucketCountingCrawler(BucketCountingCrawler):
1370 def finished_prefix(self, cycle, prefix):
1371 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1373 d = self.hook_ds.pop(0)
1376 class MyStorageServer(StorageServer):
1377 def add_bucket_counter(self):
1378 statefile = os.path.join(self.storedir, "bucket_counter.state")
1379 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1380 self.bucket_counter.setServiceParent(self)
1382 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1385 self.s = service.MultiService()
1386 self.s.startService()
1388 return self.s.stopService()
1390 def test_bucket_counter(self):
1391 basedir = "storage/BucketCounter/bucket_counter"
1392 fileutil.make_dirs(basedir)
1393 ss = StorageServer(basedir, "\x00" * 20)
1394 # to make sure we capture the bucket-counting-crawler in the middle
1395 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1396 # also make it start sooner than usual.
1397 ss.bucket_counter.slow_start = 0
1398 orig_cpu_slice = ss.bucket_counter.cpu_slice
1399 ss.bucket_counter.cpu_slice = 0
1400 ss.setServiceParent(self.s)
1402 w = StorageStatus(ss)
1404 # this sample is before the crawler has started doing anything
1405 html = w.renderSynchronously()
1406 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1407 s = remove_tags(html)
1408 self.failUnlessIn("Accepting new shares: Yes", s)
1409 self.failUnlessIn("Reserved space: - 0 B (0)", s)
1410 self.failUnlessIn("Total buckets: Not computed yet", s)
1411 self.failUnlessIn("Next crawl in", s)
1413 # give the bucket-counting-crawler one tick to get started. The
1414 # cpu_slice=0 will force it to yield right after it processes the
1417 d = fireEventually()
1418 def _check(ignored):
1419 # are we really right after the first prefix?
1420 state = ss.bucket_counter.get_state()
1421 if state["last-complete-prefix"] is None:
1422 d2 = fireEventually()
1423 d2.addCallback(_check)
1425 self.failUnlessEqual(state["last-complete-prefix"],
1426 ss.bucket_counter.prefixes[0])
1427 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1428 html = w.renderSynchronously()
1429 s = remove_tags(html)
1430 self.failUnlessIn(" Current crawl ", s)
1431 self.failUnlessIn(" (next work in ", s)
1432 d.addCallback(_check)
1434 # now give it enough time to complete a full cycle
1436 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1437 d.addCallback(lambda ignored: self.poll(_watch))
1438 def _check2(ignored):
1439 ss.bucket_counter.cpu_slice = orig_cpu_slice
1440 html = w.renderSynchronously()
1441 s = remove_tags(html)
1442 self.failUnlessIn("Total buckets: 0 (the number of", s)
1443 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1444 d.addCallback(_check2)
1447 def test_bucket_counter_cleanup(self):
1448 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1449 fileutil.make_dirs(basedir)
1450 ss = StorageServer(basedir, "\x00" * 20)
1451 # to make sure we capture the bucket-counting-crawler in the middle
1452 # of a cycle, we reach in and reduce its maximum slice time to 0.
1453 ss.bucket_counter.slow_start = 0
1454 orig_cpu_slice = ss.bucket_counter.cpu_slice
1455 ss.bucket_counter.cpu_slice = 0
1456 ss.setServiceParent(self.s)
1458 d = fireEventually()
1460 def _after_first_prefix(ignored):
1461 state = ss.bucket_counter.state
1462 if state["last-complete-prefix"] is None:
1463 d2 = fireEventually()
1464 d2.addCallback(_after_first_prefix)
1466 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1467 # now sneak in and mess with its state, to make sure it cleans up
1468 # properly at the end of the cycle
1469 self.failUnlessEqual(state["last-complete-prefix"],
1470 ss.bucket_counter.prefixes[0])
1471 state["bucket-counts"][-12] = {}
1472 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1473 ss.bucket_counter.save_state()
1474 d.addCallback(_after_first_prefix)
1476 # now give it enough time to complete a cycle
1478 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1479 d.addCallback(lambda ignored: self.poll(_watch))
1480 def _check2(ignored):
1481 ss.bucket_counter.cpu_slice = orig_cpu_slice
1482 s = ss.bucket_counter.get_state()
1483 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1484 self.failIf("bogusprefix!" in s["storage-index-samples"],
1485 s["storage-index-samples"].keys())
1486 d.addCallback(_check2)
1489 def test_bucket_counter_eta(self):
1490 basedir = "storage/BucketCounter/bucket_counter_eta"
1491 fileutil.make_dirs(basedir)
1492 ss = MyStorageServer(basedir, "\x00" * 20)
1493 ss.bucket_counter.slow_start = 0
1494 # these will be fired inside finished_prefix()
1495 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1496 w = StorageStatus(ss)
1498 d = defer.Deferred()
1500 def _check_1(ignored):
1501 # no ETA is available yet
1502 html = w.renderSynchronously()
1503 s = remove_tags(html)
1504 self.failUnlessIn("complete (next work", s)
1506 def _check_2(ignored):
1507 # one prefix has finished, so an ETA based upon that elapsed time
1508 # should be available.
1509 html = w.renderSynchronously()
1510 s = remove_tags(html)
1511 self.failUnlessIn("complete (ETA ", s)
1513 def _check_3(ignored):
1514 # two prefixes have finished
1515 html = w.renderSynchronously()
1516 s = remove_tags(html)
1517 self.failUnlessIn("complete (ETA ", s)
1520 hooks[0].addCallback(_check_1).addErrback(d.errback)
1521 hooks[1].addCallback(_check_2).addErrback(d.errback)
1522 hooks[2].addCallback(_check_3).addErrback(d.errback)
1524 ss.setServiceParent(self.s)
1527 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1528 stop_after_first_bucket = False
1529 def process_bucket(self, *args, **kwargs):
1530 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1531 if self.stop_after_first_bucket:
1532 self.stop_after_first_bucket = False
1533 self.cpu_slice = -1.0
1534 def yielding(self, sleep_time):
1535 if not self.stop_after_first_bucket:
1536 self.cpu_slice = 500
1538 class BrokenStatResults:
1540 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1543 bsr = BrokenStatResults()
1544 for attrname in dir(s):
1545 if attrname.startswith("_"):
1547 if attrname == "st_blocks":
1549 setattr(bsr, attrname, getattr(s, attrname))
1552 class InstrumentedStorageServer(StorageServer):
1553 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1554 class No_ST_BLOCKS_StorageServer(StorageServer):
1555 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1557 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1560 self.s = service.MultiService()
1561 self.s.startService()
1563 return self.s.stopService()
1565 def make_shares(self, ss):
1567 return (si, hashutil.tagged_hash("renew", si),
1568 hashutil.tagged_hash("cancel", si))
1569 def make_mutable(si):
1570 return (si, hashutil.tagged_hash("renew", si),
1571 hashutil.tagged_hash("cancel", si),
1572 hashutil.tagged_hash("write-enabler", si))
1573 def make_extra_lease(si, num):
1574 return (hashutil.tagged_hash("renew-%d" % num, si),
1575 hashutil.tagged_hash("cancel-%d" % num, si))
1577 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1578 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1579 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1580 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1581 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1582 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1584 canary = FakeCanary()
1585 # note: 'tahoe debug dump-share' will not handle this file, since the
1586 # inner contents are not a valid CHK share
1587 data = "\xff" * 1000
1589 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1591 w[0].remote_write(0, data)
1594 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1596 w[0].remote_write(0, data)
1598 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1600 writev = ss.remote_slot_testv_and_readv_and_writev
1601 writev(mutable_si_2, (we2, rs2, cs2),
1602 {0: ([], [(0,data)], len(data))}, [])
1603 writev(mutable_si_3, (we3, rs3, cs3),
1604 {0: ([], [(0,data)], len(data))}, [])
1605 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1607 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1608 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1609 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1611 def test_basic(self):
1612 basedir = "storage/LeaseCrawler/basic"
1613 fileutil.make_dirs(basedir)
1614 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1615 # make it start sooner than usual.
1616 lc = ss.lease_checker
1619 lc.stop_after_first_bucket = True
1620 webstatus = StorageStatus(ss)
1622 # create a few shares, with some leases on them
1623 self.make_shares(ss)
1624 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1626 # add a non-sharefile to exercise another code path
1627 fn = os.path.join(ss.sharedir,
1628 storage_index_to_dir(immutable_si_0),
1631 f.write("I am not a share.\n")
1634 # this is before the crawl has started, so we're not in a cycle yet
1635 initial_state = lc.get_state()
1636 self.failIf(lc.get_progress()["cycle-in-progress"])
1637 self.failIfIn("cycle-to-date", initial_state)
1638 self.failIfIn("estimated-remaining-cycle", initial_state)
1639 self.failIfIn("estimated-current-cycle", initial_state)
1640 self.failUnlessIn("history", initial_state)
1641 self.failUnlessEqual(initial_state["history"], {})
1643 ss.setServiceParent(self.s)
1647 d = fireEventually()
1649 # now examine the state right after the first bucket has been
1651 def _after_first_bucket(ignored):
1652 initial_state = lc.get_state()
1653 if "cycle-to-date" not in initial_state:
1654 d2 = fireEventually()
1655 d2.addCallback(_after_first_bucket)
1657 self.failUnlessIn("cycle-to-date", initial_state)
1658 self.failUnlessIn("estimated-remaining-cycle", initial_state)
1659 self.failUnlessIn("estimated-current-cycle", initial_state)
1660 self.failUnlessIn("history", initial_state)
1661 self.failUnlessEqual(initial_state["history"], {})
1663 so_far = initial_state["cycle-to-date"]
1664 self.failUnlessEqual(so_far["expiration-enabled"], False)
1665 self.failUnlessIn("configured-expiration-mode", so_far)
1666 self.failUnlessIn("lease-age-histogram", so_far)
1667 lah = so_far["lease-age-histogram"]
1668 self.failUnlessEqual(type(lah), list)
1669 self.failUnlessEqual(len(lah), 1)
1670 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1671 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1672 self.failUnlessEqual(so_far["corrupt-shares"], [])
1673 sr1 = so_far["space-recovered"]
1674 self.failUnlessEqual(sr1["examined-buckets"], 1)
1675 self.failUnlessEqual(sr1["examined-shares"], 1)
1676 self.failUnlessEqual(sr1["actual-shares"], 0)
1677 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1678 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1679 left = initial_state["estimated-remaining-cycle"]
1680 sr2 = left["space-recovered"]
1681 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1682 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1683 self.failIfEqual(sr2["actual-shares"], None)
1684 self.failIfEqual(sr2["configured-diskbytes"], None)
1685 self.failIfEqual(sr2["original-sharebytes"], None)
1686 d.addCallback(_after_first_bucket)
1687 d.addCallback(lambda ign: self.render1(webstatus))
1688 def _check_html_in_cycle(html):
1689 s = remove_tags(html)
1690 self.failUnlessIn("So far, this cycle has examined "
1691 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1692 self.failUnlessIn("and has recovered: "
1693 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1694 "0 B (0 B / 0 B)", s)
1695 self.failUnlessIn("If expiration were enabled, "
1696 "we would have recovered: "
1697 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1698 " 0 B (0 B / 0 B) by now", s)
1699 self.failUnlessIn("and the remainder of this cycle "
1700 "would probably recover: "
1701 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1702 " 0 B (0 B / 0 B)", s)
1703 self.failUnlessIn("and the whole cycle would probably recover: "
1704 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1705 " 0 B (0 B / 0 B)", s)
1706 self.failUnlessIn("if we were strictly using each lease's default "
1707 "31-day lease lifetime", s)
1708 self.failUnlessIn("this cycle would be expected to recover: ", s)
1709 d.addCallback(_check_html_in_cycle)
1711 # wait for the crawler to finish the first cycle. Nothing should have
1714 return bool(lc.get_state()["last-cycle-finished"] is not None)
1715 d.addCallback(lambda ign: self.poll(_wait))
1717 def _after_first_cycle(ignored):
1719 self.failIf("cycle-to-date" in s)
1720 self.failIf("estimated-remaining-cycle" in s)
1721 self.failIf("estimated-current-cycle" in s)
1722 last = s["history"][0]
1723 self.failUnlessIn("cycle-start-finish-times", last)
1724 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1725 self.failUnlessEqual(last["expiration-enabled"], False)
1726 self.failUnlessIn("configured-expiration-mode", last)
1728 self.failUnlessIn("lease-age-histogram", last)
1729 lah = last["lease-age-histogram"]
1730 self.failUnlessEqual(type(lah), list)
1731 self.failUnlessEqual(len(lah), 1)
1732 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1734 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1735 self.failUnlessEqual(last["corrupt-shares"], [])
1737 rec = last["space-recovered"]
1738 self.failUnlessEqual(rec["examined-buckets"], 4)
1739 self.failUnlessEqual(rec["examined-shares"], 4)
1740 self.failUnlessEqual(rec["actual-buckets"], 0)
1741 self.failUnlessEqual(rec["original-buckets"], 0)
1742 self.failUnlessEqual(rec["configured-buckets"], 0)
1743 self.failUnlessEqual(rec["actual-shares"], 0)
1744 self.failUnlessEqual(rec["original-shares"], 0)
1745 self.failUnlessEqual(rec["configured-shares"], 0)
1746 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1747 self.failUnlessEqual(rec["original-diskbytes"], 0)
1748 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1749 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1750 self.failUnlessEqual(rec["original-sharebytes"], 0)
1751 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1753 def _get_sharefile(si):
1754 return list(ss._iter_share_files(si))[0]
1755 def count_leases(si):
1756 return len(list(_get_sharefile(si).get_leases()))
1757 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1758 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1759 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1760 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1761 d.addCallback(_after_first_cycle)
1762 d.addCallback(lambda ign: self.render1(webstatus))
1763 def _check_html(html):
1764 s = remove_tags(html)
1765 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1766 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1767 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1768 "(2 mutable / 2 immutable),", s)
1769 self.failUnlessIn("but expiration was not enabled", s)
1770 d.addCallback(_check_html)
1771 d.addCallback(lambda ign: self.render_json(webstatus))
1772 def _check_json(json):
1773 data = simplejson.loads(json)
1774 self.failUnlessIn("lease-checker", data)
1775 self.failUnlessIn("lease-checker-progress", data)
1776 d.addCallback(_check_json)
1779 def backdate_lease(self, sf, renew_secret, new_expire_time):
1780 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1781 # "renew" a lease with a new_expire_time that is older than what the
1782 # current lease has), so we have to reach inside it.
1783 for i,lease in enumerate(sf.get_leases()):
1784 if lease.renew_secret == renew_secret:
1785 lease.expiration_time = new_expire_time
1786 f = open(sf.home, 'rb+')
1787 sf._write_lease_record(f, i, lease)
1790 raise IndexError("unable to renew non-existent lease")
1792 def test_expire_age(self):
1793 basedir = "storage/LeaseCrawler/expire_age"
1794 fileutil.make_dirs(basedir)
1795 # setting expiration_time to 2000 means that any lease which is more
1796 # than 2000s old will be expired.
1797 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1798 expiration_enabled=True,
1799 expiration_mode="age",
1800 expiration_override_lease_duration=2000)
1801 # make it start sooner than usual.
1802 lc = ss.lease_checker
1804 lc.stop_after_first_bucket = True
1805 webstatus = StorageStatus(ss)
1807 # create a few shares, with some leases on them
1808 self.make_shares(ss)
1809 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1811 def count_shares(si):
1812 return len(list(ss._iter_share_files(si)))
1813 def _get_sharefile(si):
1814 return list(ss._iter_share_files(si))[0]
1815 def count_leases(si):
1816 return len(list(_get_sharefile(si).get_leases()))
1818 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1819 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1820 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1821 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1822 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1823 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1824 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1825 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1827 # artificially crank back the expiration time on the first lease of
1828 # each share, to make it look like it expired already (age=1000s).
1829 # Some shares have an extra lease which is set to expire at the
1830 # default time in 31 days from now (age=31days). We then run the
1831 # crawler, which will expire the first lease, making some shares get
1832 # deleted and others stay alive (with one remaining lease)
1835 sf0 = _get_sharefile(immutable_si_0)
1836 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1837 sf0_size = os.stat(sf0.home).st_size
1839 # immutable_si_1 gets an extra lease
1840 sf1 = _get_sharefile(immutable_si_1)
1841 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1843 sf2 = _get_sharefile(mutable_si_2)
1844 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1845 sf2_size = os.stat(sf2.home).st_size
1847 # mutable_si_3 gets an extra lease
1848 sf3 = _get_sharefile(mutable_si_3)
1849 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1851 ss.setServiceParent(self.s)
1853 d = fireEventually()
1854 # examine the state right after the first bucket has been processed
1855 def _after_first_bucket(ignored):
1856 p = lc.get_progress()
1857 if not p["cycle-in-progress"]:
1858 d2 = fireEventually()
1859 d2.addCallback(_after_first_bucket)
1861 d.addCallback(_after_first_bucket)
1862 d.addCallback(lambda ign: self.render1(webstatus))
1863 def _check_html_in_cycle(html):
1864 s = remove_tags(html)
1865 # the first bucket encountered gets deleted, and its prefix
1866 # happens to be about 1/5th of the way through the ring, so the
1867 # predictor thinks we'll have 5 shares and that we'll delete them
1868 # all. This part of the test depends upon the SIs landing right
1869 # where they do now.
1870 self.failUnlessIn("The remainder of this cycle is expected to "
1871 "recover: 4 shares, 4 buckets", s)
1872 self.failUnlessIn("The whole cycle is expected to examine "
1873 "5 shares in 5 buckets and to recover: "
1874 "5 shares, 5 buckets", s)
1875 d.addCallback(_check_html_in_cycle)
1877 # wait for the crawler to finish the first cycle. Two shares should
1880 return bool(lc.get_state()["last-cycle-finished"] is not None)
1881 d.addCallback(lambda ign: self.poll(_wait))
1883 def _after_first_cycle(ignored):
1884 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1885 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1886 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1887 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1888 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1889 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1892 last = s["history"][0]
1894 self.failUnlessEqual(last["expiration-enabled"], True)
1895 self.failUnlessEqual(last["configured-expiration-mode"],
1896 ("age", 2000, None, ("mutable", "immutable")))
1897 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1899 rec = last["space-recovered"]
1900 self.failUnlessEqual(rec["examined-buckets"], 4)
1901 self.failUnlessEqual(rec["examined-shares"], 4)
1902 self.failUnlessEqual(rec["actual-buckets"], 2)
1903 self.failUnlessEqual(rec["original-buckets"], 2)
1904 self.failUnlessEqual(rec["configured-buckets"], 2)
1905 self.failUnlessEqual(rec["actual-shares"], 2)
1906 self.failUnlessEqual(rec["original-shares"], 2)
1907 self.failUnlessEqual(rec["configured-shares"], 2)
1908 size = sf0_size + sf2_size
1909 self.failUnlessEqual(rec["actual-sharebytes"], size)
1910 self.failUnlessEqual(rec["original-sharebytes"], size)
1911 self.failUnlessEqual(rec["configured-sharebytes"], size)
1912 # different platforms have different notions of "blocks used by
1913 # this file", so merely assert that it's a number
1914 self.failUnless(rec["actual-diskbytes"] >= 0,
1915 rec["actual-diskbytes"])
1916 self.failUnless(rec["original-diskbytes"] >= 0,
1917 rec["original-diskbytes"])
1918 self.failUnless(rec["configured-diskbytes"] >= 0,
1919 rec["configured-diskbytes"])
1920 d.addCallback(_after_first_cycle)
1921 d.addCallback(lambda ign: self.render1(webstatus))
1922 def _check_html(html):
1923 s = remove_tags(html)
1924 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1925 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1926 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1927 d.addCallback(_check_html)
1930 def test_expire_cutoff_date(self):
1931 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1932 fileutil.make_dirs(basedir)
1933 # setting cutoff-date to 2000 seconds ago means that any lease which
1934 # is more than 2000s old will be expired.
1936 then = int(now - 2000)
1937 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1938 expiration_enabled=True,
1939 expiration_mode="cutoff-date",
1940 expiration_cutoff_date=then)
1941 # make it start sooner than usual.
1942 lc = ss.lease_checker
1944 lc.stop_after_first_bucket = True
1945 webstatus = StorageStatus(ss)
1947 # create a few shares, with some leases on them
1948 self.make_shares(ss)
1949 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1951 def count_shares(si):
1952 return len(list(ss._iter_share_files(si)))
1953 def _get_sharefile(si):
1954 return list(ss._iter_share_files(si))[0]
1955 def count_leases(si):
1956 return len(list(_get_sharefile(si).get_leases()))
1958 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1959 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1960 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1961 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1962 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1963 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1964 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1965 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1967 # artificially crank back the expiration time on the first lease of
1968 # each share, to make it look like was renewed 3000s ago. To achieve
1969 # this, we need to set the expiration time to now-3000+31days. This
1970 # will change when the lease format is improved to contain both
1971 # create/renew time and duration.
1972 new_expiration_time = now - 3000 + 31*24*60*60
1974 # Some shares have an extra lease which is set to expire at the
1975 # default time in 31 days from now (age=31days). We then run the
1976 # crawler, which will expire the first lease, making some shares get
1977 # deleted and others stay alive (with one remaining lease)
1979 sf0 = _get_sharefile(immutable_si_0)
1980 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1981 sf0_size = os.stat(sf0.home).st_size
1983 # immutable_si_1 gets an extra lease
1984 sf1 = _get_sharefile(immutable_si_1)
1985 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1987 sf2 = _get_sharefile(mutable_si_2)
1988 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1989 sf2_size = os.stat(sf2.home).st_size
1991 # mutable_si_3 gets an extra lease
1992 sf3 = _get_sharefile(mutable_si_3)
1993 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1995 ss.setServiceParent(self.s)
1997 d = fireEventually()
1998 # examine the state right after the first bucket has been processed
1999 def _after_first_bucket(ignored):
2000 p = lc.get_progress()
2001 if not p["cycle-in-progress"]:
2002 d2 = fireEventually()
2003 d2.addCallback(_after_first_bucket)
2005 d.addCallback(_after_first_bucket)
2006 d.addCallback(lambda ign: self.render1(webstatus))
2007 def _check_html_in_cycle(html):
2008 s = remove_tags(html)
2009 # the first bucket encountered gets deleted, and its prefix
2010 # happens to be about 1/5th of the way through the ring, so the
2011 # predictor thinks we'll have 5 shares and that we'll delete them
2012 # all. This part of the test depends upon the SIs landing right
2013 # where they do now.
2014 self.failUnlessIn("The remainder of this cycle is expected to "
2015 "recover: 4 shares, 4 buckets", s)
2016 self.failUnlessIn("The whole cycle is expected to examine "
2017 "5 shares in 5 buckets and to recover: "
2018 "5 shares, 5 buckets", s)
2019 d.addCallback(_check_html_in_cycle)
2021 # wait for the crawler to finish the first cycle. Two shares should
2024 return bool(lc.get_state()["last-cycle-finished"] is not None)
2025 d.addCallback(lambda ign: self.poll(_wait))
2027 def _after_first_cycle(ignored):
2028 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2029 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2030 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2031 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2032 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2033 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2036 last = s["history"][0]
2038 self.failUnlessEqual(last["expiration-enabled"], True)
2039 self.failUnlessEqual(last["configured-expiration-mode"],
2040 ("cutoff-date", None, then,
2041 ("mutable", "immutable")))
2042 self.failUnlessEqual(last["leases-per-share-histogram"],
2045 rec = last["space-recovered"]
2046 self.failUnlessEqual(rec["examined-buckets"], 4)
2047 self.failUnlessEqual(rec["examined-shares"], 4)
2048 self.failUnlessEqual(rec["actual-buckets"], 2)
2049 self.failUnlessEqual(rec["original-buckets"], 0)
2050 self.failUnlessEqual(rec["configured-buckets"], 2)
2051 self.failUnlessEqual(rec["actual-shares"], 2)
2052 self.failUnlessEqual(rec["original-shares"], 0)
2053 self.failUnlessEqual(rec["configured-shares"], 2)
2054 size = sf0_size + sf2_size
2055 self.failUnlessEqual(rec["actual-sharebytes"], size)
2056 self.failUnlessEqual(rec["original-sharebytes"], 0)
2057 self.failUnlessEqual(rec["configured-sharebytes"], size)
2058 # different platforms have different notions of "blocks used by
2059 # this file", so merely assert that it's a number
2060 self.failUnless(rec["actual-diskbytes"] >= 0,
2061 rec["actual-diskbytes"])
2062 self.failUnless(rec["original-diskbytes"] >= 0,
2063 rec["original-diskbytes"])
2064 self.failUnless(rec["configured-diskbytes"] >= 0,
2065 rec["configured-diskbytes"])
2066 d.addCallback(_after_first_cycle)
2067 d.addCallback(lambda ign: self.render1(webstatus))
2068 def _check_html(html):
2069 s = remove_tags(html)
2070 self.failUnlessIn("Expiration Enabled:"
2071 " expired leases will be removed", s)
2072 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2073 substr = "Leases created or last renewed before %s will be considered expired." % date
2074 self.failUnlessIn(substr, s)
2075 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2076 d.addCallback(_check_html)
2079 def test_only_immutable(self):
2080 basedir = "storage/LeaseCrawler/only_immutable"
2081 fileutil.make_dirs(basedir)
2083 then = int(now - 2000)
2084 ss = StorageServer(basedir, "\x00" * 20,
2085 expiration_enabled=True,
2086 expiration_mode="cutoff-date",
2087 expiration_cutoff_date=then,
2088 expiration_sharetypes=("immutable",))
2089 lc = ss.lease_checker
2091 webstatus = StorageStatus(ss)
2093 self.make_shares(ss)
2094 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2095 # set all leases to be expirable
2096 new_expiration_time = now - 3000 + 31*24*60*60
2098 def count_shares(si):
2099 return len(list(ss._iter_share_files(si)))
2100 def _get_sharefile(si):
2101 return list(ss._iter_share_files(si))[0]
2102 def count_leases(si):
2103 return len(list(_get_sharefile(si).get_leases()))
2105 sf0 = _get_sharefile(immutable_si_0)
2106 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2107 sf1 = _get_sharefile(immutable_si_1)
2108 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2109 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2110 sf2 = _get_sharefile(mutable_si_2)
2111 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2112 sf3 = _get_sharefile(mutable_si_3)
2113 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2114 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2116 ss.setServiceParent(self.s)
2118 return bool(lc.get_state()["last-cycle-finished"] is not None)
2119 d = self.poll(_wait)
2121 def _after_first_cycle(ignored):
2122 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2123 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2124 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2125 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2126 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2127 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2128 d.addCallback(_after_first_cycle)
2129 d.addCallback(lambda ign: self.render1(webstatus))
2130 def _check_html(html):
2131 s = remove_tags(html)
2132 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2133 d.addCallback(_check_html)
2136 def test_only_mutable(self):
2137 basedir = "storage/LeaseCrawler/only_mutable"
2138 fileutil.make_dirs(basedir)
2140 then = int(now - 2000)
2141 ss = StorageServer(basedir, "\x00" * 20,
2142 expiration_enabled=True,
2143 expiration_mode="cutoff-date",
2144 expiration_cutoff_date=then,
2145 expiration_sharetypes=("mutable",))
2146 lc = ss.lease_checker
2148 webstatus = StorageStatus(ss)
2150 self.make_shares(ss)
2151 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2152 # set all leases to be expirable
2153 new_expiration_time = now - 3000 + 31*24*60*60
2155 def count_shares(si):
2156 return len(list(ss._iter_share_files(si)))
2157 def _get_sharefile(si):
2158 return list(ss._iter_share_files(si))[0]
2159 def count_leases(si):
2160 return len(list(_get_sharefile(si).get_leases()))
2162 sf0 = _get_sharefile(immutable_si_0)
2163 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2164 sf1 = _get_sharefile(immutable_si_1)
2165 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2166 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2167 sf2 = _get_sharefile(mutable_si_2)
2168 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2169 sf3 = _get_sharefile(mutable_si_3)
2170 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2171 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2173 ss.setServiceParent(self.s)
2175 return bool(lc.get_state()["last-cycle-finished"] is not None)
2176 d = self.poll(_wait)
2178 def _after_first_cycle(ignored):
2179 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2180 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2181 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2182 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2183 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2184 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2185 d.addCallback(_after_first_cycle)
2186 d.addCallback(lambda ign: self.render1(webstatus))
2187 def _check_html(html):
2188 s = remove_tags(html)
2189 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2190 d.addCallback(_check_html)
2193 def test_bad_mode(self):
2194 basedir = "storage/LeaseCrawler/bad_mode"
2195 fileutil.make_dirs(basedir)
2196 e = self.failUnlessRaises(ValueError,
2197 StorageServer, basedir, "\x00" * 20,
2198 expiration_mode="bogus")
2199 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2201 def test_parse_duration(self):
2205 p = time_format.parse_duration
2206 self.failUnlessEqual(p("7days"), 7*DAY)
2207 self.failUnlessEqual(p("31day"), 31*DAY)
2208 self.failUnlessEqual(p("60 days"), 60*DAY)
2209 self.failUnlessEqual(p("2mo"), 2*MONTH)
2210 self.failUnlessEqual(p("3 month"), 3*MONTH)
2211 self.failUnlessEqual(p("2years"), 2*YEAR)
2212 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2213 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2215 def test_parse_date(self):
2216 p = time_format.parse_date
2217 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2218 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2220 def test_limited_history(self):
2221 basedir = "storage/LeaseCrawler/limited_history"
2222 fileutil.make_dirs(basedir)
2223 ss = StorageServer(basedir, "\x00" * 20)
2224 # make it start sooner than usual.
2225 lc = ss.lease_checker
2229 # create a few shares, with some leases on them
2230 self.make_shares(ss)
2232 ss.setServiceParent(self.s)
2234 def _wait_until_15_cycles_done():
2235 last = lc.state["last-cycle-finished"]
2236 if last is not None and last >= 15:
2241 d = self.poll(_wait_until_15_cycles_done)
2243 def _check(ignored):
2246 self.failUnlessEqual(len(h), 10)
2247 self.failUnlessEqual(max(h.keys()), 15)
2248 self.failUnlessEqual(min(h.keys()), 6)
2249 d.addCallback(_check)
2252 def test_unpredictable_future(self):
2253 basedir = "storage/LeaseCrawler/unpredictable_future"
2254 fileutil.make_dirs(basedir)
2255 ss = StorageServer(basedir, "\x00" * 20)
2256 # make it start sooner than usual.
2257 lc = ss.lease_checker
2259 lc.cpu_slice = -1.0 # stop quickly
2261 self.make_shares(ss)
2263 ss.setServiceParent(self.s)
2265 d = fireEventually()
2266 def _check(ignored):
2267 # this should fire after the first bucket is complete, but before
2268 # the first prefix is complete, so the progress-measurer won't
2269 # think we've gotten far enough to raise our percent-complete
2270 # above 0%, triggering the cannot-predict-the-future code in
2271 # expirer.py . This will have to change if/when the
2272 # progress-measurer gets smart enough to count buckets (we'll
2273 # have to interrupt it even earlier, before it's finished the
2276 if "cycle-to-date" not in s:
2277 d2 = fireEventually()
2278 d2.addCallback(_check)
2280 self.failUnlessIn("cycle-to-date", s)
2281 self.failUnlessIn("estimated-remaining-cycle", s)
2282 self.failUnlessIn("estimated-current-cycle", s)
2284 left = s["estimated-remaining-cycle"]["space-recovered"]
2285 self.failUnlessEqual(left["actual-buckets"], None)
2286 self.failUnlessEqual(left["original-buckets"], None)
2287 self.failUnlessEqual(left["configured-buckets"], None)
2288 self.failUnlessEqual(left["actual-shares"], None)
2289 self.failUnlessEqual(left["original-shares"], None)
2290 self.failUnlessEqual(left["configured-shares"], None)
2291 self.failUnlessEqual(left["actual-diskbytes"], None)
2292 self.failUnlessEqual(left["original-diskbytes"], None)
2293 self.failUnlessEqual(left["configured-diskbytes"], None)
2294 self.failUnlessEqual(left["actual-sharebytes"], None)
2295 self.failUnlessEqual(left["original-sharebytes"], None)
2296 self.failUnlessEqual(left["configured-sharebytes"], None)
2298 full = s["estimated-remaining-cycle"]["space-recovered"]
2299 self.failUnlessEqual(full["actual-buckets"], None)
2300 self.failUnlessEqual(full["original-buckets"], None)
2301 self.failUnlessEqual(full["configured-buckets"], None)
2302 self.failUnlessEqual(full["actual-shares"], None)
2303 self.failUnlessEqual(full["original-shares"], None)
2304 self.failUnlessEqual(full["configured-shares"], None)
2305 self.failUnlessEqual(full["actual-diskbytes"], None)
2306 self.failUnlessEqual(full["original-diskbytes"], None)
2307 self.failUnlessEqual(full["configured-diskbytes"], None)
2308 self.failUnlessEqual(full["actual-sharebytes"], None)
2309 self.failUnlessEqual(full["original-sharebytes"], None)
2310 self.failUnlessEqual(full["configured-sharebytes"], None)
2312 d.addCallback(_check)
2315 def test_no_st_blocks(self):
2316 basedir = "storage/LeaseCrawler/no_st_blocks"
2317 fileutil.make_dirs(basedir)
2318 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2319 expiration_mode="age",
2320 expiration_override_lease_duration=-1000)
2321 # a negative expiration_time= means the "configured-"
2322 # space-recovered counts will be non-zero, since all shares will have
2325 # make it start sooner than usual.
2326 lc = ss.lease_checker
2329 self.make_shares(ss)
2330 ss.setServiceParent(self.s)
2332 return bool(lc.get_state()["last-cycle-finished"] is not None)
2333 d = self.poll(_wait)
2335 def _check(ignored):
2337 last = s["history"][0]
2338 rec = last["space-recovered"]
2339 self.failUnlessEqual(rec["configured-buckets"], 4)
2340 self.failUnlessEqual(rec["configured-shares"], 4)
2341 self.failUnless(rec["configured-sharebytes"] > 0,
2342 rec["configured-sharebytes"])
2343 # without the .st_blocks field in os.stat() results, we should be
2344 # reporting diskbytes==sharebytes
2345 self.failUnlessEqual(rec["configured-sharebytes"],
2346 rec["configured-diskbytes"])
2347 d.addCallback(_check)
2350 def test_share_corruption(self):
2351 self._poll_should_ignore_these_errors = [
2352 UnknownMutableContainerVersionError,
2353 UnknownImmutableContainerVersionError,
2355 basedir = "storage/LeaseCrawler/share_corruption"
2356 fileutil.make_dirs(basedir)
2357 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2358 w = StorageStatus(ss)
2359 # make it start sooner than usual.
2360 lc = ss.lease_checker
2361 lc.stop_after_first_bucket = True
2365 # create a few shares, with some leases on them
2366 self.make_shares(ss)
2368 # now corrupt one, and make sure the lease-checker keeps going
2369 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2370 first = min(self.sis)
2371 first_b32 = base32.b2a(first)
2372 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2375 f.write("BAD MAGIC")
2377 # if get_share_file() doesn't see the correct mutable magic, it
2378 # assumes the file is an immutable share, and then
2379 # immutable.ShareFile sees a bad version. So regardless of which kind
2380 # of share we corrupted, this will trigger an
2381 # UnknownImmutableContainerVersionError.
2383 # also create an empty bucket
2384 empty_si = base32.b2a("\x04"*16)
2385 empty_bucket_dir = os.path.join(ss.sharedir,
2386 storage_index_to_dir(empty_si))
2387 fileutil.make_dirs(empty_bucket_dir)
2389 ss.setServiceParent(self.s)
2391 d = fireEventually()
2393 # now examine the state right after the first bucket has been
2395 def _after_first_bucket(ignored):
2397 if "cycle-to-date" not in s:
2398 d2 = fireEventually()
2399 d2.addCallback(_after_first_bucket)
2401 so_far = s["cycle-to-date"]
2402 rec = so_far["space-recovered"]
2403 self.failUnlessEqual(rec["examined-buckets"], 1)
2404 self.failUnlessEqual(rec["examined-shares"], 0)
2405 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2406 d.addCallback(_after_first_bucket)
2408 d.addCallback(lambda ign: self.render_json(w))
2409 def _check_json(json):
2410 data = simplejson.loads(json)
2411 # grr. json turns all dict keys into strings.
2412 so_far = data["lease-checker"]["cycle-to-date"]
2413 corrupt_shares = so_far["corrupt-shares"]
2414 # it also turns all tuples into lists
2415 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2416 d.addCallback(_check_json)
2417 d.addCallback(lambda ign: self.render1(w))
2418 def _check_html(html):
2419 s = remove_tags(html)
2420 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2421 d.addCallback(_check_html)
2424 return bool(lc.get_state()["last-cycle-finished"] is not None)
2425 d.addCallback(lambda ign: self.poll(_wait))
2427 def _after_first_cycle(ignored):
2429 last = s["history"][0]
2430 rec = last["space-recovered"]
2431 self.failUnlessEqual(rec["examined-buckets"], 5)
2432 self.failUnlessEqual(rec["examined-shares"], 3)
2433 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2434 d.addCallback(_after_first_cycle)
2435 d.addCallback(lambda ign: self.render_json(w))
2436 def _check_json_history(json):
2437 data = simplejson.loads(json)
2438 last = data["lease-checker"]["history"]["0"]
2439 corrupt_shares = last["corrupt-shares"]
2440 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2441 d.addCallback(_check_json_history)
2442 d.addCallback(lambda ign: self.render1(w))
2443 def _check_html_history(html):
2444 s = remove_tags(html)
2445 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2446 d.addCallback(_check_html_history)
2449 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2450 UnknownImmutableContainerVersionError)
2455 def render_json(self, page):
2456 d = self.render1(page, args={"t": ["json"]})
2459 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2462 self.s = service.MultiService()
2463 self.s.startService()
2465 return self.s.stopService()
2467 def test_no_server(self):
2468 w = StorageStatus(None)
2469 html = w.renderSynchronously()
2470 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2472 def test_status(self):
2473 basedir = "storage/WebStatus/status"
2474 fileutil.make_dirs(basedir)
2475 ss = StorageServer(basedir, "\x00" * 20)
2476 ss.setServiceParent(self.s)
2477 w = StorageStatus(ss)
2479 def _check_html(html):
2480 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2481 s = remove_tags(html)
2482 self.failUnlessIn("Accepting new shares: Yes", s)
2483 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2484 d.addCallback(_check_html)
2485 d.addCallback(lambda ign: self.render_json(w))
2486 def _check_json(json):
2487 data = simplejson.loads(json)
2489 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2490 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2491 self.failUnlessIn("bucket-counter", data)
2492 self.failUnlessIn("lease-checker", data)
2493 d.addCallback(_check_json)
2496 def render_json(self, page):
2497 d = self.render1(page, args={"t": ["json"]})
2500 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2501 def test_status_no_disk_stats(self, mock_get_disk_stats):
2502 mock_get_disk_stats.side_effect = AttributeError()
2504 # Some platforms may have no disk stats API. Make sure the code can handle that
2505 # (test runs on all platforms).
2506 basedir = "storage/WebStatus/status_no_disk_stats"
2507 fileutil.make_dirs(basedir)
2508 ss = StorageServer(basedir, "\x00" * 20)
2509 ss.setServiceParent(self.s)
2510 w = StorageStatus(ss)
2511 html = w.renderSynchronously()
2512 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2513 s = remove_tags(html)
2514 self.failUnlessIn("Accepting new shares: Yes", s)
2515 self.failUnlessIn("Total disk space: ?", s)
2516 self.failUnlessIn("Space Available to Tahoe: ?", s)
2517 self.failUnless(ss.get_available_space() is None)
2519 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2520 def test_status_bad_disk_stats(self, mock_get_disk_stats):
2521 mock_get_disk_stats.side_effect = OSError()
2523 # If the API to get disk stats exists but a call to it fails, then the status should
2524 # show that no shares will be accepted, and get_available_space() should be 0.
2525 basedir = "storage/WebStatus/status_bad_disk_stats"
2526 fileutil.make_dirs(basedir)
2527 ss = StorageServer(basedir, "\x00" * 20)
2528 ss.setServiceParent(self.s)
2529 w = StorageStatus(ss)
2530 html = w.renderSynchronously()
2531 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2532 s = remove_tags(html)
2533 self.failUnlessIn("Accepting new shares: No", s)
2534 self.failUnlessIn("Total disk space: ?", s)
2535 self.failUnlessIn("Space Available to Tahoe: ?", s)
2536 self.failUnlessEqual(ss.get_available_space(), 0)
2538 def test_readonly(self):
2539 basedir = "storage/WebStatus/readonly"
2540 fileutil.make_dirs(basedir)
2541 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2542 ss.setServiceParent(self.s)
2543 w = StorageStatus(ss)
2544 html = w.renderSynchronously()
2545 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2546 s = remove_tags(html)
2547 self.failUnlessIn("Accepting new shares: No", s)
2549 def test_reserved(self):
2550 basedir = "storage/WebStatus/reserved"
2551 fileutil.make_dirs(basedir)
2552 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2553 ss.setServiceParent(self.s)
2554 w = StorageStatus(ss)
2555 html = w.renderSynchronously()
2556 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2557 s = remove_tags(html)
2558 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2560 def test_huge_reserved(self):
2561 basedir = "storage/WebStatus/reserved"
2562 fileutil.make_dirs(basedir)
2563 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2564 ss.setServiceParent(self.s)
2565 w = StorageStatus(ss)
2566 html = w.renderSynchronously()
2567 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2568 s = remove_tags(html)
2569 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2571 def test_util(self):
2572 w = StorageStatus(None)
2573 self.failUnlessEqual(w.render_space(None, None), "?")
2574 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2575 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2576 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2577 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2578 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)