1 import time, os.path, platform, stat, re, simplejson, struct
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.interfaces import BadWriteEnablerError
24 from allmydata.test.common import LoggingServiceParent
25 from allmydata.test.common_web import WebRenderingMixin
26 from allmydata.test.no_network import NoNetworkServer
27 from allmydata.web.storage import StorageStatus, remove_prefix
32 def __init__(self, ignore_disconnectors=False):
33 self.ignore = ignore_disconnectors
34 self.disconnectors = {}
35 def notifyOnDisconnect(self, f, *args, **kwargs):
39 self.disconnectors[m] = (f, args, kwargs)
41 def dontNotifyOnDisconnect(self, marker):
44 del self.disconnectors[marker]
46 class FakeStatsProvider:
47 def count(self, name, delta=1):
49 def register_producer(self, producer):
52 class Bucket(unittest.TestCase):
53 def make_workdir(self, name):
54 basedir = os.path.join("storage", "Bucket", name)
55 incoming = os.path.join(basedir, "tmp", "bucket")
56 final = os.path.join(basedir, "bucket")
57 fileutil.make_dirs(basedir)
58 fileutil.make_dirs(os.path.join(basedir, "tmp"))
59 return incoming, final
61 def bucket_writer_closed(self, bw, consumed):
63 def add_latency(self, category, latency):
65 def count(self, name, delta=1):
70 renew_secret = os.urandom(32)
71 cancel_secret = os.urandom(32)
72 expiration_time = time.time() + 5000
73 return LeaseInfo(owner_num, renew_secret, cancel_secret,
74 expiration_time, "\x00" * 20)
76 def test_create(self):
77 incoming, final = self.make_workdir("test_create")
78 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
80 bw.remote_write(0, "a"*25)
81 bw.remote_write(25, "b"*25)
82 bw.remote_write(50, "c"*25)
83 bw.remote_write(75, "d"*7)
86 def test_readwrite(self):
87 incoming, final = self.make_workdir("test_readwrite")
88 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
90 bw.remote_write(0, "a"*25)
91 bw.remote_write(25, "b"*25)
92 bw.remote_write(50, "c"*7) # last block may be short
96 br = BucketReader(self, bw.finalhome)
97 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
98 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
99 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
103 def callRemote(self, methname, *args, **kwargs):
105 meth = getattr(self.target, "remote_" + methname)
106 return meth(*args, **kwargs)
107 return defer.maybeDeferred(_call)
109 class BucketProxy(unittest.TestCase):
110 def make_bucket(self, name, size):
111 basedir = os.path.join("storage", "BucketProxy", name)
112 incoming = os.path.join(basedir, "tmp", "bucket")
113 final = os.path.join(basedir, "bucket")
114 fileutil.make_dirs(basedir)
115 fileutil.make_dirs(os.path.join(basedir, "tmp"))
116 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
122 def make_lease(self):
124 renew_secret = os.urandom(32)
125 cancel_secret = os.urandom(32)
126 expiration_time = time.time() + 5000
127 return LeaseInfo(owner_num, renew_secret, cancel_secret,
128 expiration_time, "\x00" * 20)
130 def bucket_writer_closed(self, bw, consumed):
132 def add_latency(self, category, latency):
134 def count(self, name, delta=1):
137 def test_create(self):
138 bw, rb, sharefname = self.make_bucket("test_create", 500)
139 bp = WriteBucketProxy(rb,
144 uri_extension_size_max=500, nodeid=None)
145 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
147 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
148 # Let's pretend each share has 100 bytes of data, and that there are
149 # 4 segments (25 bytes each), and 8 shares total. So the two
150 # per-segment merkle trees (crypttext_hash_tree,
151 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
152 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
153 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
154 # long. That should make the whole share:
156 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
157 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
159 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
161 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
163 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
165 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
167 uri_extension = "s" + "E"*498 + "e"
169 bw, rb, sharefname = self.make_bucket(name, sharesize)
175 uri_extension_size_max=len(uri_extension),
179 d.addCallback(lambda res: bp.put_block(0, "a"*25))
180 d.addCallback(lambda res: bp.put_block(1, "b"*25))
181 d.addCallback(lambda res: bp.put_block(2, "c"*25))
182 d.addCallback(lambda res: bp.put_block(3, "d"*20))
183 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
184 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
185 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
186 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
187 d.addCallback(lambda res: bp.close())
189 # now read everything back
190 def _start_reading(res):
191 br = BucketReader(self, sharefname)
194 server = NoNetworkServer("abc", None)
195 rbp = rbp_class(rb, server, storage_index="")
196 self.failUnlessIn("to peer", repr(rbp))
197 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
199 d1 = rbp.get_block_data(0, 25, 25)
200 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
201 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
202 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
203 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
204 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
205 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
206 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
208 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
209 d1.addCallback(lambda res:
210 self.failUnlessEqual(res, crypttext_hashes))
211 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
212 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
213 d1.addCallback(lambda res: rbp.get_share_hashes())
214 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
215 d1.addCallback(lambda res: rbp.get_uri_extension())
216 d1.addCallback(lambda res:
217 self.failUnlessEqual(res, uri_extension))
221 d.addCallback(_start_reading)
225 def test_readwrite_v1(self):
226 return self._do_test_readwrite("test_readwrite_v1",
227 0x24, WriteBucketProxy, ReadBucketProxy)
229 def test_readwrite_v2(self):
230 return self._do_test_readwrite("test_readwrite_v2",
231 0x44, WriteBucketProxy_v2, ReadBucketProxy)
233 class Server(unittest.TestCase):
236 self.sparent = LoggingServiceParent()
237 self.sparent.startService()
238 self._lease_secret = itertools.count()
240 return self.sparent.stopService()
242 def workdir(self, name):
243 basedir = os.path.join("storage", "Server", name)
246 def create(self, name, reserved_space=0, klass=StorageServer):
247 workdir = self.workdir(name)
248 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
249 stats_provider=FakeStatsProvider())
250 ss.setServiceParent(self.sparent)
253 def test_create(self):
254 self.create("test_create")
256 def allocate(self, ss, storage_index, sharenums, size, canary=None):
257 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
258 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260 canary = FakeCanary()
261 return ss.remote_allocate_buckets(storage_index,
262 renew_secret, cancel_secret,
263 sharenums, size, canary)
265 def test_large_share(self):
266 syslow = platform.system().lower()
267 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
268 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
270 avail = fileutil.get_available_space('.', 512*2**20)
272 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
274 ss = self.create("test_large_share")
276 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
277 self.failUnlessEqual(already, set())
278 self.failUnlessEqual(set(writers.keys()), set([0]))
280 shnum, bucket = writers.items()[0]
281 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
282 bucket.remote_write(2**32, "ab")
283 bucket.remote_close()
285 readers = ss.remote_get_buckets("allocate")
286 reader = readers[shnum]
287 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
289 def test_dont_overfill_dirs(self):
291 This test asserts that if you add a second share whose storage index
292 share lots of leading bits with an extant share (but isn't the exact
293 same storage index), this won't add an entry to the share directory.
295 ss = self.create("test_dont_overfill_dirs")
296 already, writers = self.allocate(ss, "storageindex", [0], 10)
297 for i, wb in writers.items():
298 wb.remote_write(0, "%10d" % i)
300 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
302 children_of_storedir = set(os.listdir(storedir))
304 # Now store another one under another storageindex that has leading
305 # chars the same as the first storageindex.
306 already, writers = self.allocate(ss, "storageindey", [0], 10)
307 for i, wb in writers.items():
308 wb.remote_write(0, "%10d" % i)
310 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
312 new_children_of_storedir = set(os.listdir(storedir))
313 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
315 def test_remove_incoming(self):
316 ss = self.create("test_remove_incoming")
317 already, writers = self.allocate(ss, "vid", range(3), 10)
318 for i,wb in writers.items():
319 wb.remote_write(0, "%10d" % i)
321 incoming_share_dir = wb.incominghome
322 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
323 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
324 incoming_dir = os.path.dirname(incoming_prefix_dir)
325 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
326 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
327 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
329 def test_abort(self):
330 # remote_abort, when called on a writer, should make sure that
331 # the allocated size of the bucket is not counted by the storage
332 # server when accounting for space.
333 ss = self.create("test_abort")
334 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
335 self.failIfEqual(ss.allocated_size(), 0)
337 # Now abort the writers.
338 for writer in writers.itervalues():
339 writer.remote_abort()
340 self.failUnlessEqual(ss.allocated_size(), 0)
343 def test_allocate(self):
344 ss = self.create("test_allocate")
346 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
348 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
349 self.failUnlessEqual(already, set())
350 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
352 # while the buckets are open, they should not count as readable
353 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
356 for i,wb in writers.items():
357 wb.remote_write(0, "%25d" % i)
359 # aborting a bucket that was already closed is a no-op
362 # now they should be readable
363 b = ss.remote_get_buckets("allocate")
364 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
365 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
367 self.failUnlessIn("BucketReader", b_str)
368 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
370 # now if we ask about writing again, the server should offer those
371 # three buckets as already present. It should offer them even if we
372 # don't ask about those specific ones.
373 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
374 self.failUnlessEqual(already, set([0,1,2]))
375 self.failUnlessEqual(set(writers.keys()), set([3,4]))
377 # while those two buckets are open for writing, the server should
378 # refuse to offer them to uploaders
380 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
381 self.failUnlessEqual(already2, set([0,1,2]))
382 self.failUnlessEqual(set(writers2.keys()), set([5]))
384 # aborting the writes should remove the tempfiles
385 for i,wb in writers2.items():
387 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
388 self.failUnlessEqual(already2, set([0,1,2]))
389 self.failUnlessEqual(set(writers2.keys()), set([5]))
391 for i,wb in writers2.items():
393 for i,wb in writers.items():
396 def test_bad_container_version(self):
397 ss = self.create("test_bad_container_version")
398 a,w = self.allocate(ss, "si1", [0], 10)
399 w[0].remote_write(0, "\xff"*10)
402 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
405 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
408 ss.remote_get_buckets("allocate")
410 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
411 ss.remote_get_buckets, "si1")
412 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
414 def test_disconnect(self):
415 # simulate a disconnection
416 ss = self.create("test_disconnect")
417 canary = FakeCanary()
418 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
419 self.failUnlessEqual(already, set())
420 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
421 for (f,args,kwargs) in canary.disconnectors.values():
426 # that ought to delete the incoming shares
427 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
428 self.failUnlessEqual(already, set())
429 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
431 @mock.patch('allmydata.util.fileutil.get_disk_stats')
432 def test_reserved_space(self, mock_get_disk_stats):
434 mock_get_disk_stats.return_value = {
435 'free_for_nonroot': 15000,
436 'avail': max(15000 - reserved_space, 0),
439 ss = self.create("test_reserved_space", reserved_space=reserved_space)
440 # 15k available, 10k reserved, leaves 5k for shares
442 # a newly created and filled share incurs this much overhead, beyond
443 # the size we request.
445 LEASE_SIZE = 4+32+32+4
446 canary = FakeCanary(True)
447 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
448 self.failUnlessEqual(len(writers), 3)
449 # now the StorageServer should have 3000 bytes provisionally
450 # allocated, allowing only 2000 more to be claimed
451 self.failUnlessEqual(len(ss._active_writers), 3)
453 # allocating 1001-byte shares only leaves room for one
454 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
455 self.failUnlessEqual(len(writers2), 1)
456 self.failUnlessEqual(len(ss._active_writers), 4)
458 # we abandon the first set, so their provisional allocation should be
462 self.failUnlessEqual(len(ss._active_writers), 1)
463 # now we have a provisional allocation of 1001 bytes
465 # and we close the second set, so their provisional allocation should
466 # become real, long-term allocation, and grows to include the
468 for bw in writers2.values():
469 bw.remote_write(0, "a"*25)
474 self.failUnlessEqual(len(ss._active_writers), 0)
476 allocated = 1001 + OVERHEAD + LEASE_SIZE
478 # we have to manually increase available, since we're not doing real
480 mock_get_disk_stats.return_value = {
481 'free_for_nonroot': 15000 - allocated,
482 'avail': max(15000 - allocated - reserved_space, 0),
485 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
486 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
487 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
488 self.failUnlessEqual(len(writers3), 39)
489 self.failUnlessEqual(len(ss._active_writers), 39)
493 self.failUnlessEqual(len(ss._active_writers), 0)
494 ss.disownServiceParent()
498 basedir = self.workdir("test_seek_behavior")
499 fileutil.make_dirs(basedir)
500 filename = os.path.join(basedir, "testfile")
501 f = open(filename, "wb")
504 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
505 # files. mode="a" preserves previous contents but does not allow
506 # seeking-to-create-holes. mode="r+" allows both.
507 f = open(filename, "rb+")
511 filelen = os.stat(filename)[stat.ST_SIZE]
512 self.failUnlessEqual(filelen, 100+3)
513 f2 = open(filename, "rb")
514 self.failUnlessEqual(f2.read(5), "start")
517 def test_leases(self):
518 ss = self.create("test_leases")
519 canary = FakeCanary()
523 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
524 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
525 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
526 sharenums, size, canary)
527 self.failUnlessEqual(len(already), 0)
528 self.failUnlessEqual(len(writers), 5)
529 for wb in writers.values():
532 leases = list(ss.get_leases("si0"))
533 self.failUnlessEqual(len(leases), 1)
534 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
536 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
537 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
538 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
539 sharenums, size, canary)
540 for wb in writers.values():
543 # take out a second lease on si1
544 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
545 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
546 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
547 sharenums, size, canary)
548 self.failUnlessEqual(len(already), 5)
549 self.failUnlessEqual(len(writers), 0)
551 leases = list(ss.get_leases("si1"))
552 self.failUnlessEqual(len(leases), 2)
553 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
555 # and a third lease, using add-lease
556 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
557 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
558 ss.remote_add_lease("si1", rs2a, cs2a)
559 leases = list(ss.get_leases("si1"))
560 self.failUnlessEqual(len(leases), 3)
561 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
563 # add-lease on a missing storage index is silently ignored
564 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
566 # check that si0 is readable
567 readers = ss.remote_get_buckets("si0")
568 self.failUnlessEqual(len(readers), 5)
570 # renew the first lease. Only the proper renew_secret should work
571 ss.remote_renew_lease("si0", rs0)
572 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
573 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
575 # check that si0 is still readable
576 readers = ss.remote_get_buckets("si0")
577 self.failUnlessEqual(len(readers), 5)
580 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
581 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
582 ss.remote_cancel_lease("si0", cs0)
584 # si0 should now be gone
585 readers = ss.remote_get_buckets("si0")
586 self.failUnlessEqual(len(readers), 0)
587 # and the renew should no longer work
588 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
591 # cancel the first lease on si1, leaving the second and third in place
592 ss.remote_cancel_lease("si1", cs1)
593 readers = ss.remote_get_buckets("si1")
594 self.failUnlessEqual(len(readers), 5)
595 # the corresponding renew should no longer work
596 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
598 leases = list(ss.get_leases("si1"))
599 self.failUnlessEqual(len(leases), 2)
600 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
602 ss.remote_renew_lease("si1", rs2)
603 # cancelling the second and third should make it go away
604 ss.remote_cancel_lease("si1", cs2)
605 ss.remote_cancel_lease("si1", cs2a)
606 readers = ss.remote_get_buckets("si1")
607 self.failUnlessEqual(len(readers), 0)
608 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
609 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
610 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
612 leases = list(ss.get_leases("si1"))
613 self.failUnlessEqual(len(leases), 0)
616 # test overlapping uploads
617 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
618 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
619 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
620 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
621 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
622 sharenums, size, canary)
623 self.failUnlessEqual(len(already), 0)
624 self.failUnlessEqual(len(writers), 5)
625 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
626 sharenums, size, canary)
627 self.failUnlessEqual(len(already2), 0)
628 self.failUnlessEqual(len(writers2), 0)
629 for wb in writers.values():
632 leases = list(ss.get_leases("si3"))
633 self.failUnlessEqual(len(leases), 1)
635 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
636 sharenums, size, canary)
637 self.failUnlessEqual(len(already3), 5)
638 self.failUnlessEqual(len(writers3), 0)
640 leases = list(ss.get_leases("si3"))
641 self.failUnlessEqual(len(leases), 2)
643 def test_readonly(self):
644 workdir = self.workdir("test_readonly")
645 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
646 ss.setServiceParent(self.sparent)
648 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
649 self.failUnlessEqual(already, set())
650 self.failUnlessEqual(writers, {})
652 stats = ss.get_stats()
653 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
654 if "storage_server.disk_avail" in stats:
655 # Some platforms may not have an API to get disk stats.
656 # But if there are stats, readonly_storage means disk_avail=0
657 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
659 def test_discard(self):
660 # discard is really only used for other tests, but we test it anyways
661 workdir = self.workdir("test_discard")
662 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
663 ss.setServiceParent(self.sparent)
665 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
666 self.failUnlessEqual(already, set())
667 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
668 for i,wb in writers.items():
669 wb.remote_write(0, "%25d" % i)
671 # since we discard the data, the shares should be present but sparse.
672 # Since we write with some seeks, the data we read back will be all
674 b = ss.remote_get_buckets("vid")
675 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
676 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
678 def test_advise_corruption(self):
679 workdir = self.workdir("test_advise_corruption")
680 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
681 ss.setServiceParent(self.sparent)
683 si0_s = base32.b2a("si0")
684 ss.remote_advise_corrupt_share("immutable", "si0", 0,
685 "This share smells funny.\n")
686 reportdir = os.path.join(workdir, "corruption-advisories")
687 reports = os.listdir(reportdir)
688 self.failUnlessEqual(len(reports), 1)
689 report_si0 = reports[0]
690 self.failUnlessIn(si0_s, report_si0)
691 f = open(os.path.join(reportdir, report_si0), "r")
694 self.failUnlessIn("type: immutable", report)
695 self.failUnlessIn("storage_index: %s" % si0_s, report)
696 self.failUnlessIn("share_number: 0", report)
697 self.failUnlessIn("This share smells funny.", report)
699 # test the RIBucketWriter version too
700 si1_s = base32.b2a("si1")
701 already,writers = self.allocate(ss, "si1", [1], 75)
702 self.failUnlessEqual(already, set())
703 self.failUnlessEqual(set(writers.keys()), set([1]))
704 writers[1].remote_write(0, "data")
705 writers[1].remote_close()
707 b = ss.remote_get_buckets("si1")
708 self.failUnlessEqual(set(b.keys()), set([1]))
709 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
711 reports = os.listdir(reportdir)
712 self.failUnlessEqual(len(reports), 2)
713 report_si1 = [r for r in reports if si1_s in r][0]
714 f = open(os.path.join(reportdir, report_si1), "r")
717 self.failUnlessIn("type: immutable", report)
718 self.failUnlessIn("storage_index: %s" % si1_s, report)
719 self.failUnlessIn("share_number: 1", report)
720 self.failUnlessIn("This share tastes like dust.", report)
724 class MutableServer(unittest.TestCase):
727 self.sparent = LoggingServiceParent()
728 self._lease_secret = itertools.count()
730 return self.sparent.stopService()
732 def workdir(self, name):
733 basedir = os.path.join("storage", "MutableServer", name)
736 def create(self, name):
737 workdir = self.workdir(name)
738 ss = StorageServer(workdir, "\x00" * 20)
739 ss.setServiceParent(self.sparent)
742 def test_create(self):
743 self.create("test_create")
745 def write_enabler(self, we_tag):
746 return hashutil.tagged_hash("we_blah", we_tag)
748 def renew_secret(self, tag):
749 return hashutil.tagged_hash("renew_blah", str(tag))
751 def cancel_secret(self, tag):
752 return hashutil.tagged_hash("cancel_blah", str(tag))
754 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
755 write_enabler = self.write_enabler(we_tag)
756 renew_secret = self.renew_secret(lease_tag)
757 cancel_secret = self.cancel_secret(lease_tag)
758 rstaraw = ss.remote_slot_testv_and_readv_and_writev
759 testandwritev = dict( [ (shnum, ([], [], None) )
760 for shnum in sharenums ] )
762 rc = rstaraw(storage_index,
763 (write_enabler, renew_secret, cancel_secret),
766 (did_write, readv_data) = rc
767 self.failUnless(did_write)
768 self.failUnless(isinstance(readv_data, dict))
769 self.failUnlessEqual(len(readv_data), 0)
771 def test_bad_magic(self):
772 ss = self.create("test_bad_magic")
773 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
774 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
779 read = ss.remote_slot_readv
780 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
781 read, "si1", [0], [(0,10)])
782 self.failUnlessIn(" had magic ", str(e))
783 self.failUnlessIn(" but we wanted ", str(e))
785 def test_container_size(self):
786 ss = self.create("test_container_size")
787 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
789 read = ss.remote_slot_readv
790 rstaraw = ss.remote_slot_testv_and_readv_and_writev
791 secrets = ( self.write_enabler("we1"),
792 self.renew_secret("we1"),
793 self.cancel_secret("we1") )
794 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
795 answer = rstaraw("si1", secrets,
796 {0: ([], [(0,data)], len(data)+12)},
798 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
800 # trying to make the container too large will raise an exception
801 TOOBIG = MutableShareFile.MAX_SIZE + 10
802 self.failUnlessRaises(DataTooLargeError,
803 rstaraw, "si1", secrets,
804 {0: ([], [(0,data)], TOOBIG)},
807 # it should be possible to make the container smaller, although at
808 # the moment this doesn't actually affect the share, unless the
809 # container size is dropped to zero, in which case the share is
811 answer = rstaraw("si1", secrets,
812 {0: ([], [(0,data)], len(data)+8)},
814 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
816 answer = rstaraw("si1", secrets,
817 {0: ([], [(0,data)], 0)},
819 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
821 read_answer = read("si1", [0], [(0,10)])
822 self.failUnlessEqual(read_answer, {})
824 def test_allocate(self):
825 ss = self.create("test_allocate")
826 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
829 read = ss.remote_slot_readv
830 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
832 self.failUnlessEqual(read("si1", [], [(0, 10)]),
833 {0: [""], 1: [""], 2: [""]})
834 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
838 secrets = ( self.write_enabler("we1"),
839 self.renew_secret("we1"),
840 self.cancel_secret("we1") )
841 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
842 write = ss.remote_slot_testv_and_readv_and_writev
843 answer = write("si1", secrets,
844 {0: ([], [(0,data)], None)},
846 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
848 self.failUnlessEqual(read("si1", [0], [(0,20)]),
849 {0: ["00000000001111111111"]})
850 self.failUnlessEqual(read("si1", [0], [(95,10)]),
852 #self.failUnlessEqual(s0.remote_get_length(), 100)
854 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
855 f = self.failUnlessRaises(BadWriteEnablerError,
856 write, "si1", bad_secrets,
858 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
860 # this testv should fail
861 answer = write("si1", secrets,
862 {0: ([(0, 12, "eq", "444444444444"),
863 (20, 5, "eq", "22222"),
870 self.failUnlessEqual(answer, (False,
871 {0: ["000000000011", "22222"],
875 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
878 answer = write("si1", secrets,
879 {0: ([(10, 5, "lt", "11111"),
886 self.failUnlessEqual(answer, (False,
891 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
894 def test_operators(self):
895 # test operators, the data we're comparing is '11111' in all cases.
896 # test both fail+pass, reset data after each one.
897 ss = self.create("test_operators")
899 secrets = ( self.write_enabler("we1"),
900 self.renew_secret("we1"),
901 self.cancel_secret("we1") )
902 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
903 write = ss.remote_slot_testv_and_readv_and_writev
904 read = ss.remote_slot_readv
907 write("si1", secrets,
908 {0: ([], [(0,data)], None)},
914 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
919 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
920 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
921 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
924 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
929 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
930 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
933 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
938 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
939 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
943 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
948 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
949 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
952 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
957 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
961 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
966 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
967 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
971 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
976 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
977 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
980 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
985 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
986 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
990 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
995 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
996 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
999 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1004 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1005 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1009 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1014 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1018 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1023 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1024 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1027 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1032 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1033 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1037 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1042 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1043 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1046 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1051 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1055 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1060 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1061 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1064 # finally, test some operators against empty shares
1065 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1070 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1071 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1074 def test_readv(self):
1075 ss = self.create("test_readv")
1076 secrets = ( self.write_enabler("we1"),
1077 self.renew_secret("we1"),
1078 self.cancel_secret("we1") )
1079 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1080 write = ss.remote_slot_testv_and_readv_and_writev
1081 read = ss.remote_slot_readv
1082 data = [("%d" % i) * 100 for i in range(3)]
1083 rc = write("si1", secrets,
1084 {0: ([], [(0,data[0])], None),
1085 1: ([], [(0,data[1])], None),
1086 2: ([], [(0,data[2])], None),
1088 self.failUnlessEqual(rc, (True, {}))
1090 answer = read("si1", [], [(0, 10)])
1091 self.failUnlessEqual(answer, {0: ["0"*10],
1095 def compare_leases_without_timestamps(self, leases_a, leases_b):
1096 self.failUnlessEqual(len(leases_a), len(leases_b))
1097 for i in range(len(leases_a)):
1100 self.failUnlessEqual(a.owner_num, b.owner_num)
1101 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1102 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1103 self.failUnlessEqual(a.nodeid, b.nodeid)
1105 def compare_leases(self, leases_a, leases_b):
1106 self.failUnlessEqual(len(leases_a), len(leases_b))
1107 for i in range(len(leases_a)):
1110 self.failUnlessEqual(a.owner_num, b.owner_num)
1111 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1112 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1113 self.failUnlessEqual(a.nodeid, b.nodeid)
1114 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1116 def test_leases(self):
1117 ss = self.create("test_leases")
1119 return ( self.write_enabler("we1"),
1120 self.renew_secret("we1-%d" % n),
1121 self.cancel_secret("we1-%d" % n) )
1122 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1123 write = ss.remote_slot_testv_and_readv_and_writev
1124 read = ss.remote_slot_readv
1125 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1126 self.failUnlessEqual(rc, (True, {}))
1128 # create a random non-numeric file in the bucket directory, to
1129 # exercise the code that's supposed to ignore those.
1130 bucket_dir = os.path.join(self.workdir("test_leases"),
1131 "shares", storage_index_to_dir("si1"))
1132 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1133 f.write("you ought to be ignoring me\n")
1136 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1137 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1139 # add-lease on a missing storage index is silently ignored
1140 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1142 # re-allocate the slots and use the same secrets, that should update
1144 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1145 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1148 ss.remote_renew_lease("si1", secrets(0)[1])
1149 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1151 # now allocate them with a bunch of different secrets, to trigger the
1152 # extended lease code. Use add_lease for one of them.
1153 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1154 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1155 secrets2 = secrets(2)
1156 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1157 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1158 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1159 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1160 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1162 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1164 # cancel one of them
1165 ss.remote_cancel_lease("si1", secrets(5)[2])
1166 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1168 all_leases = list(s0.get_leases())
1169 # and write enough data to expand the container, forcing the server
1170 # to move the leases
1171 write("si1", secrets(0),
1172 {0: ([], [(0,data)], 200), },
1175 # read back the leases, make sure they're still intact.
1176 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1178 ss.remote_renew_lease("si1", secrets(0)[1])
1179 ss.remote_renew_lease("si1", secrets(1)[1])
1180 ss.remote_renew_lease("si1", secrets(2)[1])
1181 ss.remote_renew_lease("si1", secrets(3)[1])
1182 ss.remote_renew_lease("si1", secrets(4)[1])
1183 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1184 # get a new copy of the leases, with the current timestamps. Reading
1185 # data and failing to renew/cancel leases should leave the timestamps
1187 all_leases = list(s0.get_leases())
1188 # renewing with a bogus token should prompt an error message
1190 # examine the exception thus raised, make sure the old nodeid is
1191 # present, to provide for share migration
1192 e = self.failUnlessRaises(IndexError,
1193 ss.remote_renew_lease, "si1",
1196 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1197 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1198 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1200 # same for cancelling
1201 self.failUnlessRaises(IndexError,
1202 ss.remote_cancel_lease, "si1",
1204 self.compare_leases(all_leases, list(s0.get_leases()))
1206 # reading shares should not modify the timestamp
1207 read("si1", [], [(0,200)])
1208 self.compare_leases(all_leases, list(s0.get_leases()))
1210 write("si1", secrets(0),
1211 {0: ([], [(200, "make me bigger")], None)}, [])
1212 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1214 write("si1", secrets(0),
1215 {0: ([], [(500, "make me really bigger")], None)}, [])
1216 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1218 # now cancel them all
1219 ss.remote_cancel_lease("si1", secrets(0)[2])
1220 ss.remote_cancel_lease("si1", secrets(1)[2])
1221 ss.remote_cancel_lease("si1", secrets(2)[2])
1222 ss.remote_cancel_lease("si1", secrets(3)[2])
1224 # the slot should still be there
1225 remaining_shares = read("si1", [], [(0,10)])
1226 self.failUnlessEqual(len(remaining_shares), 1)
1227 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1229 # cancelling a non-existent lease should raise an IndexError
1230 self.failUnlessRaises(IndexError,
1231 ss.remote_cancel_lease, "si1", "nonsecret")
1233 # and the slot should still be there
1234 remaining_shares = read("si1", [], [(0,10)])
1235 self.failUnlessEqual(len(remaining_shares), 1)
1236 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1238 ss.remote_cancel_lease("si1", secrets(4)[2])
1239 # now the slot should be gone
1240 no_shares = read("si1", [], [(0,10)])
1241 self.failUnlessEqual(no_shares, {})
1243 # cancelling a lease on a non-existent share should raise an IndexError
1244 self.failUnlessRaises(IndexError,
1245 ss.remote_cancel_lease, "si2", "nonsecret")
1247 def test_remove(self):
1248 ss = self.create("test_remove")
1249 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1251 readv = ss.remote_slot_readv
1252 writev = ss.remote_slot_testv_and_readv_and_writev
1253 secrets = ( self.write_enabler("we1"),
1254 self.renew_secret("we1"),
1255 self.cancel_secret("we1") )
1256 # delete sh0 by setting its size to zero
1257 answer = writev("si1", secrets,
1260 # the answer should mention all the shares that existed before the
1262 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1263 # but a new read should show only sh1 and sh2
1264 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1267 # delete sh1 by setting its size to zero
1268 answer = writev("si1", secrets,
1271 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1272 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1275 # delete sh2 by setting its size to zero
1276 answer = writev("si1", secrets,
1279 self.failUnlessEqual(answer, (True, {2:[]}) )
1280 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1282 # and the bucket directory should now be gone
1283 si = base32.b2a("si1")
1284 # note: this is a detail of the storage server implementation, and
1285 # may change in the future
1287 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1288 bucketdir = os.path.join(prefixdir, si)
1289 self.failUnless(os.path.exists(prefixdir), prefixdir)
1290 self.failIf(os.path.exists(bucketdir), bucketdir)
1292 class Stats(unittest.TestCase):
1295 self.sparent = LoggingServiceParent()
1296 self._lease_secret = itertools.count()
1298 return self.sparent.stopService()
1300 def workdir(self, name):
1301 basedir = os.path.join("storage", "Server", name)
1304 def create(self, name):
1305 workdir = self.workdir(name)
1306 ss = StorageServer(workdir, "\x00" * 20)
1307 ss.setServiceParent(self.sparent)
1310 def test_latencies(self):
1311 ss = self.create("test_latencies")
1312 for i in range(10000):
1313 ss.add_latency("allocate", 1.0 * i)
1314 for i in range(1000):
1315 ss.add_latency("renew", 1.0 * i)
1317 ss.add_latency("write", 1.0 * i)
1319 ss.add_latency("cancel", 2.0 * i)
1320 ss.add_latency("get", 5.0)
1322 output = ss.get_latencies()
1324 self.failUnlessEqual(sorted(output.keys()),
1325 sorted(["allocate", "renew", "cancel", "write", "get"]))
1326 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1327 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1328 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1329 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1330 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1331 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1332 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1333 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1334 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1336 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1337 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1338 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
1339 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1340 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1341 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1342 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1343 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1344 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1346 self.failUnlessEqual(len(ss.latencies["write"]), 20)
1347 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
1348 self.failUnless(output["write"]["01_0_percentile"] is None, output)
1349 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
1350 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
1351 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
1352 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
1353 self.failUnless(output["write"]["99_0_percentile"] is None, output)
1354 self.failUnless(output["write"]["99_9_percentile"] is None, output)
1356 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1357 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1358 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
1359 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
1360 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1361 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1362 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
1363 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
1364 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
1366 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1367 self.failUnless(output["get"]["mean"] is None, output)
1368 self.failUnless(output["get"]["01_0_percentile"] is None, output)
1369 self.failUnless(output["get"]["10_0_percentile"] is None, output)
1370 self.failUnless(output["get"]["50_0_percentile"] is None, output)
1371 self.failUnless(output["get"]["90_0_percentile"] is None, output)
1372 self.failUnless(output["get"]["95_0_percentile"] is None, output)
1373 self.failUnless(output["get"]["99_0_percentile"] is None, output)
1374 self.failUnless(output["get"]["99_9_percentile"] is None, output)
1377 s = re.sub(r'<[^>]*>', ' ', s)
1378 s = re.sub(r'\s+', ' ', s)
1381 class MyBucketCountingCrawler(BucketCountingCrawler):
1382 def finished_prefix(self, cycle, prefix):
1383 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1385 d = self.hook_ds.pop(0)
1388 class MyStorageServer(StorageServer):
1389 def add_bucket_counter(self):
1390 statefile = os.path.join(self.storedir, "bucket_counter.state")
1391 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1392 self.bucket_counter.setServiceParent(self)
1394 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1397 self.s = service.MultiService()
1398 self.s.startService()
1400 return self.s.stopService()
1402 def test_bucket_counter(self):
1403 basedir = "storage/BucketCounter/bucket_counter"
1404 fileutil.make_dirs(basedir)
1405 ss = StorageServer(basedir, "\x00" * 20)
1406 # to make sure we capture the bucket-counting-crawler in the middle
1407 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1408 # also make it start sooner than usual.
1409 ss.bucket_counter.slow_start = 0
1410 orig_cpu_slice = ss.bucket_counter.cpu_slice
1411 ss.bucket_counter.cpu_slice = 0
1412 ss.setServiceParent(self.s)
1414 w = StorageStatus(ss)
1416 # this sample is before the crawler has started doing anything
1417 html = w.renderSynchronously()
1418 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1419 s = remove_tags(html)
1420 self.failUnlessIn("Accepting new shares: Yes", s)
1421 self.failUnlessIn("Reserved space: - 0 B (0)", s)
1422 self.failUnlessIn("Total buckets: Not computed yet", s)
1423 self.failUnlessIn("Next crawl in", s)
1425 # give the bucket-counting-crawler one tick to get started. The
1426 # cpu_slice=0 will force it to yield right after it processes the
1429 d = fireEventually()
1430 def _check(ignored):
1431 # are we really right after the first prefix?
1432 state = ss.bucket_counter.get_state()
1433 if state["last-complete-prefix"] is None:
1434 d2 = fireEventually()
1435 d2.addCallback(_check)
1437 self.failUnlessEqual(state["last-complete-prefix"],
1438 ss.bucket_counter.prefixes[0])
1439 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1440 html = w.renderSynchronously()
1441 s = remove_tags(html)
1442 self.failUnlessIn(" Current crawl ", s)
1443 self.failUnlessIn(" (next work in ", s)
1444 d.addCallback(_check)
1446 # now give it enough time to complete a full cycle
1448 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1449 d.addCallback(lambda ignored: self.poll(_watch))
1450 def _check2(ignored):
1451 ss.bucket_counter.cpu_slice = orig_cpu_slice
1452 html = w.renderSynchronously()
1453 s = remove_tags(html)
1454 self.failUnlessIn("Total buckets: 0 (the number of", s)
1455 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1456 d.addCallback(_check2)
1459 def test_bucket_counter_cleanup(self):
1460 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1461 fileutil.make_dirs(basedir)
1462 ss = StorageServer(basedir, "\x00" * 20)
1463 # to make sure we capture the bucket-counting-crawler in the middle
1464 # of a cycle, we reach in and reduce its maximum slice time to 0.
1465 ss.bucket_counter.slow_start = 0
1466 orig_cpu_slice = ss.bucket_counter.cpu_slice
1467 ss.bucket_counter.cpu_slice = 0
1468 ss.setServiceParent(self.s)
1470 d = fireEventually()
1472 def _after_first_prefix(ignored):
1473 state = ss.bucket_counter.state
1474 if state["last-complete-prefix"] is None:
1475 d2 = fireEventually()
1476 d2.addCallback(_after_first_prefix)
1478 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1479 # now sneak in and mess with its state, to make sure it cleans up
1480 # properly at the end of the cycle
1481 self.failUnlessEqual(state["last-complete-prefix"],
1482 ss.bucket_counter.prefixes[0])
1483 state["bucket-counts"][-12] = {}
1484 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1485 ss.bucket_counter.save_state()
1486 d.addCallback(_after_first_prefix)
1488 # now give it enough time to complete a cycle
1490 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1491 d.addCallback(lambda ignored: self.poll(_watch))
1492 def _check2(ignored):
1493 ss.bucket_counter.cpu_slice = orig_cpu_slice
1494 s = ss.bucket_counter.get_state()
1495 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1496 self.failIf("bogusprefix!" in s["storage-index-samples"],
1497 s["storage-index-samples"].keys())
1498 d.addCallback(_check2)
1501 def test_bucket_counter_eta(self):
1502 basedir = "storage/BucketCounter/bucket_counter_eta"
1503 fileutil.make_dirs(basedir)
1504 ss = MyStorageServer(basedir, "\x00" * 20)
1505 ss.bucket_counter.slow_start = 0
1506 # these will be fired inside finished_prefix()
1507 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1508 w = StorageStatus(ss)
1510 d = defer.Deferred()
1512 def _check_1(ignored):
1513 # no ETA is available yet
1514 html = w.renderSynchronously()
1515 s = remove_tags(html)
1516 self.failUnlessIn("complete (next work", s)
1518 def _check_2(ignored):
1519 # one prefix has finished, so an ETA based upon that elapsed time
1520 # should be available.
1521 html = w.renderSynchronously()
1522 s = remove_tags(html)
1523 self.failUnlessIn("complete (ETA ", s)
1525 def _check_3(ignored):
1526 # two prefixes have finished
1527 html = w.renderSynchronously()
1528 s = remove_tags(html)
1529 self.failUnlessIn("complete (ETA ", s)
1532 hooks[0].addCallback(_check_1).addErrback(d.errback)
1533 hooks[1].addCallback(_check_2).addErrback(d.errback)
1534 hooks[2].addCallback(_check_3).addErrback(d.errback)
1536 ss.setServiceParent(self.s)
1539 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1540 stop_after_first_bucket = False
1541 def process_bucket(self, *args, **kwargs):
1542 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1543 if self.stop_after_first_bucket:
1544 self.stop_after_first_bucket = False
1545 self.cpu_slice = -1.0
1546 def yielding(self, sleep_time):
1547 if not self.stop_after_first_bucket:
1548 self.cpu_slice = 500
1550 class BrokenStatResults:
1552 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1555 bsr = BrokenStatResults()
1556 for attrname in dir(s):
1557 if attrname.startswith("_"):
1559 if attrname == "st_blocks":
1561 setattr(bsr, attrname, getattr(s, attrname))
1564 class InstrumentedStorageServer(StorageServer):
1565 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1566 class No_ST_BLOCKS_StorageServer(StorageServer):
1567 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1569 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1572 self.s = service.MultiService()
1573 self.s.startService()
1575 return self.s.stopService()
1577 def make_shares(self, ss):
1579 return (si, hashutil.tagged_hash("renew", si),
1580 hashutil.tagged_hash("cancel", si))
1581 def make_mutable(si):
1582 return (si, hashutil.tagged_hash("renew", si),
1583 hashutil.tagged_hash("cancel", si),
1584 hashutil.tagged_hash("write-enabler", si))
1585 def make_extra_lease(si, num):
1586 return (hashutil.tagged_hash("renew-%d" % num, si),
1587 hashutil.tagged_hash("cancel-%d" % num, si))
1589 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1590 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1591 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1592 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1593 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1594 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1596 canary = FakeCanary()
1597 # note: 'tahoe debug dump-share' will not handle this file, since the
1598 # inner contents are not a valid CHK share
1599 data = "\xff" * 1000
1601 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1603 w[0].remote_write(0, data)
1606 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1608 w[0].remote_write(0, data)
1610 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1612 writev = ss.remote_slot_testv_and_readv_and_writev
1613 writev(mutable_si_2, (we2, rs2, cs2),
1614 {0: ([], [(0,data)], len(data))}, [])
1615 writev(mutable_si_3, (we3, rs3, cs3),
1616 {0: ([], [(0,data)], len(data))}, [])
1617 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1619 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1620 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1621 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1623 def test_basic(self):
1624 basedir = "storage/LeaseCrawler/basic"
1625 fileutil.make_dirs(basedir)
1626 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1627 # make it start sooner than usual.
1628 lc = ss.lease_checker
1631 lc.stop_after_first_bucket = True
1632 webstatus = StorageStatus(ss)
1634 # create a few shares, with some leases on them
1635 self.make_shares(ss)
1636 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1638 # add a non-sharefile to exercise another code path
1639 fn = os.path.join(ss.sharedir,
1640 storage_index_to_dir(immutable_si_0),
1643 f.write("I am not a share.\n")
1646 # this is before the crawl has started, so we're not in a cycle yet
1647 initial_state = lc.get_state()
1648 self.failIf(lc.get_progress()["cycle-in-progress"])
1649 self.failIfIn("cycle-to-date", initial_state)
1650 self.failIfIn("estimated-remaining-cycle", initial_state)
1651 self.failIfIn("estimated-current-cycle", initial_state)
1652 self.failUnlessIn("history", initial_state)
1653 self.failUnlessEqual(initial_state["history"], {})
1655 ss.setServiceParent(self.s)
1659 d = fireEventually()
1661 # now examine the state right after the first bucket has been
1663 def _after_first_bucket(ignored):
1664 initial_state = lc.get_state()
1665 if "cycle-to-date" not in initial_state:
1666 d2 = fireEventually()
1667 d2.addCallback(_after_first_bucket)
1669 self.failUnlessIn("cycle-to-date", initial_state)
1670 self.failUnlessIn("estimated-remaining-cycle", initial_state)
1671 self.failUnlessIn("estimated-current-cycle", initial_state)
1672 self.failUnlessIn("history", initial_state)
1673 self.failUnlessEqual(initial_state["history"], {})
1675 so_far = initial_state["cycle-to-date"]
1676 self.failUnlessEqual(so_far["expiration-enabled"], False)
1677 self.failUnlessIn("configured-expiration-mode", so_far)
1678 self.failUnlessIn("lease-age-histogram", so_far)
1679 lah = so_far["lease-age-histogram"]
1680 self.failUnlessEqual(type(lah), list)
1681 self.failUnlessEqual(len(lah), 1)
1682 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1683 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1684 self.failUnlessEqual(so_far["corrupt-shares"], [])
1685 sr1 = so_far["space-recovered"]
1686 self.failUnlessEqual(sr1["examined-buckets"], 1)
1687 self.failUnlessEqual(sr1["examined-shares"], 1)
1688 self.failUnlessEqual(sr1["actual-shares"], 0)
1689 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1690 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1691 left = initial_state["estimated-remaining-cycle"]
1692 sr2 = left["space-recovered"]
1693 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1694 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1695 self.failIfEqual(sr2["actual-shares"], None)
1696 self.failIfEqual(sr2["configured-diskbytes"], None)
1697 self.failIfEqual(sr2["original-sharebytes"], None)
1698 d.addCallback(_after_first_bucket)
1699 d.addCallback(lambda ign: self.render1(webstatus))
1700 def _check_html_in_cycle(html):
1701 s = remove_tags(html)
1702 self.failUnlessIn("So far, this cycle has examined "
1703 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1704 self.failUnlessIn("and has recovered: "
1705 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1706 "0 B (0 B / 0 B)", s)
1707 self.failUnlessIn("If expiration were enabled, "
1708 "we would have recovered: "
1709 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1710 " 0 B (0 B / 0 B) by now", s)
1711 self.failUnlessIn("and the remainder of this cycle "
1712 "would probably recover: "
1713 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1714 " 0 B (0 B / 0 B)", s)
1715 self.failUnlessIn("and the whole cycle would probably recover: "
1716 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1717 " 0 B (0 B / 0 B)", s)
1718 self.failUnlessIn("if we were strictly using each lease's default "
1719 "31-day lease lifetime", s)
1720 self.failUnlessIn("this cycle would be expected to recover: ", s)
1721 d.addCallback(_check_html_in_cycle)
1723 # wait for the crawler to finish the first cycle. Nothing should have
1726 return bool(lc.get_state()["last-cycle-finished"] is not None)
1727 d.addCallback(lambda ign: self.poll(_wait))
1729 def _after_first_cycle(ignored):
1731 self.failIf("cycle-to-date" in s)
1732 self.failIf("estimated-remaining-cycle" in s)
1733 self.failIf("estimated-current-cycle" in s)
1734 last = s["history"][0]
1735 self.failUnlessIn("cycle-start-finish-times", last)
1736 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1737 self.failUnlessEqual(last["expiration-enabled"], False)
1738 self.failUnlessIn("configured-expiration-mode", last)
1740 self.failUnlessIn("lease-age-histogram", last)
1741 lah = last["lease-age-histogram"]
1742 self.failUnlessEqual(type(lah), list)
1743 self.failUnlessEqual(len(lah), 1)
1744 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1746 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1747 self.failUnlessEqual(last["corrupt-shares"], [])
1749 rec = last["space-recovered"]
1750 self.failUnlessEqual(rec["examined-buckets"], 4)
1751 self.failUnlessEqual(rec["examined-shares"], 4)
1752 self.failUnlessEqual(rec["actual-buckets"], 0)
1753 self.failUnlessEqual(rec["original-buckets"], 0)
1754 self.failUnlessEqual(rec["configured-buckets"], 0)
1755 self.failUnlessEqual(rec["actual-shares"], 0)
1756 self.failUnlessEqual(rec["original-shares"], 0)
1757 self.failUnlessEqual(rec["configured-shares"], 0)
1758 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1759 self.failUnlessEqual(rec["original-diskbytes"], 0)
1760 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1761 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1762 self.failUnlessEqual(rec["original-sharebytes"], 0)
1763 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1765 def _get_sharefile(si):
1766 return list(ss._iter_share_files(si))[0]
1767 def count_leases(si):
1768 return len(list(_get_sharefile(si).get_leases()))
1769 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1770 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1771 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1772 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1773 d.addCallback(_after_first_cycle)
1774 d.addCallback(lambda ign: self.render1(webstatus))
1775 def _check_html(html):
1776 s = remove_tags(html)
1777 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1778 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1779 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1780 "(2 mutable / 2 immutable),", s)
1781 self.failUnlessIn("but expiration was not enabled", s)
1782 d.addCallback(_check_html)
1783 d.addCallback(lambda ign: self.render_json(webstatus))
1784 def _check_json(json):
1785 data = simplejson.loads(json)
1786 self.failUnlessIn("lease-checker", data)
1787 self.failUnlessIn("lease-checker-progress", data)
1788 d.addCallback(_check_json)
1791 def backdate_lease(self, sf, renew_secret, new_expire_time):
1792 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1793 # "renew" a lease with a new_expire_time that is older than what the
1794 # current lease has), so we have to reach inside it.
1795 for i,lease in enumerate(sf.get_leases()):
1796 if lease.renew_secret == renew_secret:
1797 lease.expiration_time = new_expire_time
1798 f = open(sf.home, 'rb+')
1799 sf._write_lease_record(f, i, lease)
1802 raise IndexError("unable to renew non-existent lease")
1804 def test_expire_age(self):
1805 basedir = "storage/LeaseCrawler/expire_age"
1806 fileutil.make_dirs(basedir)
1807 # setting expiration_time to 2000 means that any lease which is more
1808 # than 2000s old will be expired.
1809 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1810 expiration_enabled=True,
1811 expiration_mode="age",
1812 expiration_override_lease_duration=2000)
1813 # make it start sooner than usual.
1814 lc = ss.lease_checker
1816 lc.stop_after_first_bucket = True
1817 webstatus = StorageStatus(ss)
1819 # create a few shares, with some leases on them
1820 self.make_shares(ss)
1821 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1823 def count_shares(si):
1824 return len(list(ss._iter_share_files(si)))
1825 def _get_sharefile(si):
1826 return list(ss._iter_share_files(si))[0]
1827 def count_leases(si):
1828 return len(list(_get_sharefile(si).get_leases()))
1830 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1831 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1832 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1833 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1834 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1835 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1836 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1837 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1839 # artificially crank back the expiration time on the first lease of
1840 # each share, to make it look like it expired already (age=1000s).
1841 # Some shares have an extra lease which is set to expire at the
1842 # default time in 31 days from now (age=31days). We then run the
1843 # crawler, which will expire the first lease, making some shares get
1844 # deleted and others stay alive (with one remaining lease)
1847 sf0 = _get_sharefile(immutable_si_0)
1848 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1849 sf0_size = os.stat(sf0.home).st_size
1851 # immutable_si_1 gets an extra lease
1852 sf1 = _get_sharefile(immutable_si_1)
1853 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1855 sf2 = _get_sharefile(mutable_si_2)
1856 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1857 sf2_size = os.stat(sf2.home).st_size
1859 # mutable_si_3 gets an extra lease
1860 sf3 = _get_sharefile(mutable_si_3)
1861 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1863 ss.setServiceParent(self.s)
1865 d = fireEventually()
1866 # examine the state right after the first bucket has been processed
1867 def _after_first_bucket(ignored):
1868 p = lc.get_progress()
1869 if not p["cycle-in-progress"]:
1870 d2 = fireEventually()
1871 d2.addCallback(_after_first_bucket)
1873 d.addCallback(_after_first_bucket)
1874 d.addCallback(lambda ign: self.render1(webstatus))
1875 def _check_html_in_cycle(html):
1876 s = remove_tags(html)
1877 # the first bucket encountered gets deleted, and its prefix
1878 # happens to be about 1/5th of the way through the ring, so the
1879 # predictor thinks we'll have 5 shares and that we'll delete them
1880 # all. This part of the test depends upon the SIs landing right
1881 # where they do now.
1882 self.failUnlessIn("The remainder of this cycle is expected to "
1883 "recover: 4 shares, 4 buckets", s)
1884 self.failUnlessIn("The whole cycle is expected to examine "
1885 "5 shares in 5 buckets and to recover: "
1886 "5 shares, 5 buckets", s)
1887 d.addCallback(_check_html_in_cycle)
1889 # wait for the crawler to finish the first cycle. Two shares should
1892 return bool(lc.get_state()["last-cycle-finished"] is not None)
1893 d.addCallback(lambda ign: self.poll(_wait))
1895 def _after_first_cycle(ignored):
1896 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1897 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1898 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1899 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1900 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1901 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1904 last = s["history"][0]
1906 self.failUnlessEqual(last["expiration-enabled"], True)
1907 self.failUnlessEqual(last["configured-expiration-mode"],
1908 ("age", 2000, None, ("mutable", "immutable")))
1909 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1911 rec = last["space-recovered"]
1912 self.failUnlessEqual(rec["examined-buckets"], 4)
1913 self.failUnlessEqual(rec["examined-shares"], 4)
1914 self.failUnlessEqual(rec["actual-buckets"], 2)
1915 self.failUnlessEqual(rec["original-buckets"], 2)
1916 self.failUnlessEqual(rec["configured-buckets"], 2)
1917 self.failUnlessEqual(rec["actual-shares"], 2)
1918 self.failUnlessEqual(rec["original-shares"], 2)
1919 self.failUnlessEqual(rec["configured-shares"], 2)
1920 size = sf0_size + sf2_size
1921 self.failUnlessEqual(rec["actual-sharebytes"], size)
1922 self.failUnlessEqual(rec["original-sharebytes"], size)
1923 self.failUnlessEqual(rec["configured-sharebytes"], size)
1924 # different platforms have different notions of "blocks used by
1925 # this file", so merely assert that it's a number
1926 self.failUnless(rec["actual-diskbytes"] >= 0,
1927 rec["actual-diskbytes"])
1928 self.failUnless(rec["original-diskbytes"] >= 0,
1929 rec["original-diskbytes"])
1930 self.failUnless(rec["configured-diskbytes"] >= 0,
1931 rec["configured-diskbytes"])
1932 d.addCallback(_after_first_cycle)
1933 d.addCallback(lambda ign: self.render1(webstatus))
1934 def _check_html(html):
1935 s = remove_tags(html)
1936 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1937 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1938 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1939 d.addCallback(_check_html)
1942 def test_expire_cutoff_date(self):
1943 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1944 fileutil.make_dirs(basedir)
1945 # setting cutoff-date to 2000 seconds ago means that any lease which
1946 # is more than 2000s old will be expired.
1948 then = int(now - 2000)
1949 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1950 expiration_enabled=True,
1951 expiration_mode="cutoff-date",
1952 expiration_cutoff_date=then)
1953 # make it start sooner than usual.
1954 lc = ss.lease_checker
1956 lc.stop_after_first_bucket = True
1957 webstatus = StorageStatus(ss)
1959 # create a few shares, with some leases on them
1960 self.make_shares(ss)
1961 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1963 def count_shares(si):
1964 return len(list(ss._iter_share_files(si)))
1965 def _get_sharefile(si):
1966 return list(ss._iter_share_files(si))[0]
1967 def count_leases(si):
1968 return len(list(_get_sharefile(si).get_leases()))
1970 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1971 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1972 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1973 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1974 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1975 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1976 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1977 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1979 # artificially crank back the expiration time on the first lease of
1980 # each share, to make it look like was renewed 3000s ago. To achieve
1981 # this, we need to set the expiration time to now-3000+31days. This
1982 # will change when the lease format is improved to contain both
1983 # create/renew time and duration.
1984 new_expiration_time = now - 3000 + 31*24*60*60
1986 # Some shares have an extra lease which is set to expire at the
1987 # default time in 31 days from now (age=31days). We then run the
1988 # crawler, which will expire the first lease, making some shares get
1989 # deleted and others stay alive (with one remaining lease)
1991 sf0 = _get_sharefile(immutable_si_0)
1992 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1993 sf0_size = os.stat(sf0.home).st_size
1995 # immutable_si_1 gets an extra lease
1996 sf1 = _get_sharefile(immutable_si_1)
1997 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1999 sf2 = _get_sharefile(mutable_si_2)
2000 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2001 sf2_size = os.stat(sf2.home).st_size
2003 # mutable_si_3 gets an extra lease
2004 sf3 = _get_sharefile(mutable_si_3)
2005 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2007 ss.setServiceParent(self.s)
2009 d = fireEventually()
2010 # examine the state right after the first bucket has been processed
2011 def _after_first_bucket(ignored):
2012 p = lc.get_progress()
2013 if not p["cycle-in-progress"]:
2014 d2 = fireEventually()
2015 d2.addCallback(_after_first_bucket)
2017 d.addCallback(_after_first_bucket)
2018 d.addCallback(lambda ign: self.render1(webstatus))
2019 def _check_html_in_cycle(html):
2020 s = remove_tags(html)
2021 # the first bucket encountered gets deleted, and its prefix
2022 # happens to be about 1/5th of the way through the ring, so the
2023 # predictor thinks we'll have 5 shares and that we'll delete them
2024 # all. This part of the test depends upon the SIs landing right
2025 # where they do now.
2026 self.failUnlessIn("The remainder of this cycle is expected to "
2027 "recover: 4 shares, 4 buckets", s)
2028 self.failUnlessIn("The whole cycle is expected to examine "
2029 "5 shares in 5 buckets and to recover: "
2030 "5 shares, 5 buckets", s)
2031 d.addCallback(_check_html_in_cycle)
2033 # wait for the crawler to finish the first cycle. Two shares should
2036 return bool(lc.get_state()["last-cycle-finished"] is not None)
2037 d.addCallback(lambda ign: self.poll(_wait))
2039 def _after_first_cycle(ignored):
2040 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2041 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2042 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2043 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2044 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2045 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2048 last = s["history"][0]
2050 self.failUnlessEqual(last["expiration-enabled"], True)
2051 self.failUnlessEqual(last["configured-expiration-mode"],
2052 ("cutoff-date", None, then,
2053 ("mutable", "immutable")))
2054 self.failUnlessEqual(last["leases-per-share-histogram"],
2057 rec = last["space-recovered"]
2058 self.failUnlessEqual(rec["examined-buckets"], 4)
2059 self.failUnlessEqual(rec["examined-shares"], 4)
2060 self.failUnlessEqual(rec["actual-buckets"], 2)
2061 self.failUnlessEqual(rec["original-buckets"], 0)
2062 self.failUnlessEqual(rec["configured-buckets"], 2)
2063 self.failUnlessEqual(rec["actual-shares"], 2)
2064 self.failUnlessEqual(rec["original-shares"], 0)
2065 self.failUnlessEqual(rec["configured-shares"], 2)
2066 size = sf0_size + sf2_size
2067 self.failUnlessEqual(rec["actual-sharebytes"], size)
2068 self.failUnlessEqual(rec["original-sharebytes"], 0)
2069 self.failUnlessEqual(rec["configured-sharebytes"], size)
2070 # different platforms have different notions of "blocks used by
2071 # this file", so merely assert that it's a number
2072 self.failUnless(rec["actual-diskbytes"] >= 0,
2073 rec["actual-diskbytes"])
2074 self.failUnless(rec["original-diskbytes"] >= 0,
2075 rec["original-diskbytes"])
2076 self.failUnless(rec["configured-diskbytes"] >= 0,
2077 rec["configured-diskbytes"])
2078 d.addCallback(_after_first_cycle)
2079 d.addCallback(lambda ign: self.render1(webstatus))
2080 def _check_html(html):
2081 s = remove_tags(html)
2082 self.failUnlessIn("Expiration Enabled:"
2083 " expired leases will be removed", s)
2084 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2085 substr = "Leases created or last renewed before %s will be considered expired." % date
2086 self.failUnlessIn(substr, s)
2087 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2088 d.addCallback(_check_html)
2091 def test_only_immutable(self):
2092 basedir = "storage/LeaseCrawler/only_immutable"
2093 fileutil.make_dirs(basedir)
2095 then = int(now - 2000)
2096 ss = StorageServer(basedir, "\x00" * 20,
2097 expiration_enabled=True,
2098 expiration_mode="cutoff-date",
2099 expiration_cutoff_date=then,
2100 expiration_sharetypes=("immutable",))
2101 lc = ss.lease_checker
2103 webstatus = StorageStatus(ss)
2105 self.make_shares(ss)
2106 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2107 # set all leases to be expirable
2108 new_expiration_time = now - 3000 + 31*24*60*60
2110 def count_shares(si):
2111 return len(list(ss._iter_share_files(si)))
2112 def _get_sharefile(si):
2113 return list(ss._iter_share_files(si))[0]
2114 def count_leases(si):
2115 return len(list(_get_sharefile(si).get_leases()))
2117 sf0 = _get_sharefile(immutable_si_0)
2118 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2119 sf1 = _get_sharefile(immutable_si_1)
2120 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2121 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2122 sf2 = _get_sharefile(mutable_si_2)
2123 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2124 sf3 = _get_sharefile(mutable_si_3)
2125 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2126 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2128 ss.setServiceParent(self.s)
2130 return bool(lc.get_state()["last-cycle-finished"] is not None)
2131 d = self.poll(_wait)
2133 def _after_first_cycle(ignored):
2134 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2135 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2136 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2137 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2138 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2139 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2140 d.addCallback(_after_first_cycle)
2141 d.addCallback(lambda ign: self.render1(webstatus))
2142 def _check_html(html):
2143 s = remove_tags(html)
2144 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2145 d.addCallback(_check_html)
2148 def test_only_mutable(self):
2149 basedir = "storage/LeaseCrawler/only_mutable"
2150 fileutil.make_dirs(basedir)
2152 then = int(now - 2000)
2153 ss = StorageServer(basedir, "\x00" * 20,
2154 expiration_enabled=True,
2155 expiration_mode="cutoff-date",
2156 expiration_cutoff_date=then,
2157 expiration_sharetypes=("mutable",))
2158 lc = ss.lease_checker
2160 webstatus = StorageStatus(ss)
2162 self.make_shares(ss)
2163 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2164 # set all leases to be expirable
2165 new_expiration_time = now - 3000 + 31*24*60*60
2167 def count_shares(si):
2168 return len(list(ss._iter_share_files(si)))
2169 def _get_sharefile(si):
2170 return list(ss._iter_share_files(si))[0]
2171 def count_leases(si):
2172 return len(list(_get_sharefile(si).get_leases()))
2174 sf0 = _get_sharefile(immutable_si_0)
2175 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2176 sf1 = _get_sharefile(immutable_si_1)
2177 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2178 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2179 sf2 = _get_sharefile(mutable_si_2)
2180 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2181 sf3 = _get_sharefile(mutable_si_3)
2182 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2183 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2185 ss.setServiceParent(self.s)
2187 return bool(lc.get_state()["last-cycle-finished"] is not None)
2188 d = self.poll(_wait)
2190 def _after_first_cycle(ignored):
2191 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2192 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2193 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2194 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2195 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2196 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2197 d.addCallback(_after_first_cycle)
2198 d.addCallback(lambda ign: self.render1(webstatus))
2199 def _check_html(html):
2200 s = remove_tags(html)
2201 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2202 d.addCallback(_check_html)
2205 def test_bad_mode(self):
2206 basedir = "storage/LeaseCrawler/bad_mode"
2207 fileutil.make_dirs(basedir)
2208 e = self.failUnlessRaises(ValueError,
2209 StorageServer, basedir, "\x00" * 20,
2210 expiration_mode="bogus")
2211 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2213 def test_parse_duration(self):
2217 p = time_format.parse_duration
2218 self.failUnlessEqual(p("7days"), 7*DAY)
2219 self.failUnlessEqual(p("31day"), 31*DAY)
2220 self.failUnlessEqual(p("60 days"), 60*DAY)
2221 self.failUnlessEqual(p("2mo"), 2*MONTH)
2222 self.failUnlessEqual(p("3 month"), 3*MONTH)
2223 self.failUnlessEqual(p("2years"), 2*YEAR)
2224 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2225 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2227 def test_parse_date(self):
2228 p = time_format.parse_date
2229 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2230 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2232 def test_limited_history(self):
2233 basedir = "storage/LeaseCrawler/limited_history"
2234 fileutil.make_dirs(basedir)
2235 ss = StorageServer(basedir, "\x00" * 20)
2236 # make it start sooner than usual.
2237 lc = ss.lease_checker
2241 # create a few shares, with some leases on them
2242 self.make_shares(ss)
2244 ss.setServiceParent(self.s)
2246 def _wait_until_15_cycles_done():
2247 last = lc.state["last-cycle-finished"]
2248 if last is not None and last >= 15:
2253 d = self.poll(_wait_until_15_cycles_done)
2255 def _check(ignored):
2258 self.failUnlessEqual(len(h), 10)
2259 self.failUnlessEqual(max(h.keys()), 15)
2260 self.failUnlessEqual(min(h.keys()), 6)
2261 d.addCallback(_check)
2264 def test_unpredictable_future(self):
2265 basedir = "storage/LeaseCrawler/unpredictable_future"
2266 fileutil.make_dirs(basedir)
2267 ss = StorageServer(basedir, "\x00" * 20)
2268 # make it start sooner than usual.
2269 lc = ss.lease_checker
2271 lc.cpu_slice = -1.0 # stop quickly
2273 self.make_shares(ss)
2275 ss.setServiceParent(self.s)
2277 d = fireEventually()
2278 def _check(ignored):
2279 # this should fire after the first bucket is complete, but before
2280 # the first prefix is complete, so the progress-measurer won't
2281 # think we've gotten far enough to raise our percent-complete
2282 # above 0%, triggering the cannot-predict-the-future code in
2283 # expirer.py . This will have to change if/when the
2284 # progress-measurer gets smart enough to count buckets (we'll
2285 # have to interrupt it even earlier, before it's finished the
2288 if "cycle-to-date" not in s:
2289 d2 = fireEventually()
2290 d2.addCallback(_check)
2292 self.failUnlessIn("cycle-to-date", s)
2293 self.failUnlessIn("estimated-remaining-cycle", s)
2294 self.failUnlessIn("estimated-current-cycle", s)
2296 left = s["estimated-remaining-cycle"]["space-recovered"]
2297 self.failUnlessEqual(left["actual-buckets"], None)
2298 self.failUnlessEqual(left["original-buckets"], None)
2299 self.failUnlessEqual(left["configured-buckets"], None)
2300 self.failUnlessEqual(left["actual-shares"], None)
2301 self.failUnlessEqual(left["original-shares"], None)
2302 self.failUnlessEqual(left["configured-shares"], None)
2303 self.failUnlessEqual(left["actual-diskbytes"], None)
2304 self.failUnlessEqual(left["original-diskbytes"], None)
2305 self.failUnlessEqual(left["configured-diskbytes"], None)
2306 self.failUnlessEqual(left["actual-sharebytes"], None)
2307 self.failUnlessEqual(left["original-sharebytes"], None)
2308 self.failUnlessEqual(left["configured-sharebytes"], None)
2310 full = s["estimated-remaining-cycle"]["space-recovered"]
2311 self.failUnlessEqual(full["actual-buckets"], None)
2312 self.failUnlessEqual(full["original-buckets"], None)
2313 self.failUnlessEqual(full["configured-buckets"], None)
2314 self.failUnlessEqual(full["actual-shares"], None)
2315 self.failUnlessEqual(full["original-shares"], None)
2316 self.failUnlessEqual(full["configured-shares"], None)
2317 self.failUnlessEqual(full["actual-diskbytes"], None)
2318 self.failUnlessEqual(full["original-diskbytes"], None)
2319 self.failUnlessEqual(full["configured-diskbytes"], None)
2320 self.failUnlessEqual(full["actual-sharebytes"], None)
2321 self.failUnlessEqual(full["original-sharebytes"], None)
2322 self.failUnlessEqual(full["configured-sharebytes"], None)
2324 d.addCallback(_check)
2327 def test_no_st_blocks(self):
2328 basedir = "storage/LeaseCrawler/no_st_blocks"
2329 fileutil.make_dirs(basedir)
2330 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2331 expiration_mode="age",
2332 expiration_override_lease_duration=-1000)
2333 # a negative expiration_time= means the "configured-"
2334 # space-recovered counts will be non-zero, since all shares will have
2337 # make it start sooner than usual.
2338 lc = ss.lease_checker
2341 self.make_shares(ss)
2342 ss.setServiceParent(self.s)
2344 return bool(lc.get_state()["last-cycle-finished"] is not None)
2345 d = self.poll(_wait)
2347 def _check(ignored):
2349 last = s["history"][0]
2350 rec = last["space-recovered"]
2351 self.failUnlessEqual(rec["configured-buckets"], 4)
2352 self.failUnlessEqual(rec["configured-shares"], 4)
2353 self.failUnless(rec["configured-sharebytes"] > 0,
2354 rec["configured-sharebytes"])
2355 # without the .st_blocks field in os.stat() results, we should be
2356 # reporting diskbytes==sharebytes
2357 self.failUnlessEqual(rec["configured-sharebytes"],
2358 rec["configured-diskbytes"])
2359 d.addCallback(_check)
2362 def test_share_corruption(self):
2363 self._poll_should_ignore_these_errors = [
2364 UnknownMutableContainerVersionError,
2365 UnknownImmutableContainerVersionError,
2367 basedir = "storage/LeaseCrawler/share_corruption"
2368 fileutil.make_dirs(basedir)
2369 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2370 w = StorageStatus(ss)
2371 # make it start sooner than usual.
2372 lc = ss.lease_checker
2373 lc.stop_after_first_bucket = True
2377 # create a few shares, with some leases on them
2378 self.make_shares(ss)
2380 # now corrupt one, and make sure the lease-checker keeps going
2381 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2382 first = min(self.sis)
2383 first_b32 = base32.b2a(first)
2384 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2387 f.write("BAD MAGIC")
2389 # if get_share_file() doesn't see the correct mutable magic, it
2390 # assumes the file is an immutable share, and then
2391 # immutable.ShareFile sees a bad version. So regardless of which kind
2392 # of share we corrupted, this will trigger an
2393 # UnknownImmutableContainerVersionError.
2395 # also create an empty bucket
2396 empty_si = base32.b2a("\x04"*16)
2397 empty_bucket_dir = os.path.join(ss.sharedir,
2398 storage_index_to_dir(empty_si))
2399 fileutil.make_dirs(empty_bucket_dir)
2401 ss.setServiceParent(self.s)
2403 d = fireEventually()
2405 # now examine the state right after the first bucket has been
2407 def _after_first_bucket(ignored):
2409 if "cycle-to-date" not in s:
2410 d2 = fireEventually()
2411 d2.addCallback(_after_first_bucket)
2413 so_far = s["cycle-to-date"]
2414 rec = so_far["space-recovered"]
2415 self.failUnlessEqual(rec["examined-buckets"], 1)
2416 self.failUnlessEqual(rec["examined-shares"], 0)
2417 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2418 d.addCallback(_after_first_bucket)
2420 d.addCallback(lambda ign: self.render_json(w))
2421 def _check_json(json):
2422 data = simplejson.loads(json)
2423 # grr. json turns all dict keys into strings.
2424 so_far = data["lease-checker"]["cycle-to-date"]
2425 corrupt_shares = so_far["corrupt-shares"]
2426 # it also turns all tuples into lists
2427 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2428 d.addCallback(_check_json)
2429 d.addCallback(lambda ign: self.render1(w))
2430 def _check_html(html):
2431 s = remove_tags(html)
2432 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2433 d.addCallback(_check_html)
2436 return bool(lc.get_state()["last-cycle-finished"] is not None)
2437 d.addCallback(lambda ign: self.poll(_wait))
2439 def _after_first_cycle(ignored):
2441 last = s["history"][0]
2442 rec = last["space-recovered"]
2443 self.failUnlessEqual(rec["examined-buckets"], 5)
2444 self.failUnlessEqual(rec["examined-shares"], 3)
2445 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2446 d.addCallback(_after_first_cycle)
2447 d.addCallback(lambda ign: self.render_json(w))
2448 def _check_json_history(json):
2449 data = simplejson.loads(json)
2450 last = data["lease-checker"]["history"]["0"]
2451 corrupt_shares = last["corrupt-shares"]
2452 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2453 d.addCallback(_check_json_history)
2454 d.addCallback(lambda ign: self.render1(w))
2455 def _check_html_history(html):
2456 s = remove_tags(html)
2457 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2458 d.addCallback(_check_html_history)
2461 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2462 UnknownImmutableContainerVersionError)
2467 def render_json(self, page):
2468 d = self.render1(page, args={"t": ["json"]})
2471 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2474 self.s = service.MultiService()
2475 self.s.startService()
2477 return self.s.stopService()
2479 def test_no_server(self):
2480 w = StorageStatus(None)
2481 html = w.renderSynchronously()
2482 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2484 def test_status(self):
2485 basedir = "storage/WebStatus/status"
2486 fileutil.make_dirs(basedir)
2487 ss = StorageServer(basedir, "\x00" * 20)
2488 ss.setServiceParent(self.s)
2489 w = StorageStatus(ss)
2491 def _check_html(html):
2492 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2493 s = remove_tags(html)
2494 self.failUnlessIn("Accepting new shares: Yes", s)
2495 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2496 d.addCallback(_check_html)
2497 d.addCallback(lambda ign: self.render_json(w))
2498 def _check_json(json):
2499 data = simplejson.loads(json)
2501 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2502 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2503 self.failUnlessIn("bucket-counter", data)
2504 self.failUnlessIn("lease-checker", data)
2505 d.addCallback(_check_json)
2508 def render_json(self, page):
2509 d = self.render1(page, args={"t": ["json"]})
2512 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2513 def test_status_no_disk_stats(self, mock_get_disk_stats):
2514 mock_get_disk_stats.side_effect = AttributeError()
2516 # Some platforms may have no disk stats API. Make sure the code can handle that
2517 # (test runs on all platforms).
2518 basedir = "storage/WebStatus/status_no_disk_stats"
2519 fileutil.make_dirs(basedir)
2520 ss = StorageServer(basedir, "\x00" * 20)
2521 ss.setServiceParent(self.s)
2522 w = StorageStatus(ss)
2523 html = w.renderSynchronously()
2524 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2525 s = remove_tags(html)
2526 self.failUnlessIn("Accepting new shares: Yes", s)
2527 self.failUnlessIn("Total disk space: ?", s)
2528 self.failUnlessIn("Space Available to Tahoe: ?", s)
2529 self.failUnless(ss.get_available_space() is None)
2531 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2532 def test_status_bad_disk_stats(self, mock_get_disk_stats):
2533 mock_get_disk_stats.side_effect = OSError()
2535 # If the API to get disk stats exists but a call to it fails, then the status should
2536 # show that no shares will be accepted, and get_available_space() should be 0.
2537 basedir = "storage/WebStatus/status_bad_disk_stats"
2538 fileutil.make_dirs(basedir)
2539 ss = StorageServer(basedir, "\x00" * 20)
2540 ss.setServiceParent(self.s)
2541 w = StorageStatus(ss)
2542 html = w.renderSynchronously()
2543 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2544 s = remove_tags(html)
2545 self.failUnlessIn("Accepting new shares: No", s)
2546 self.failUnlessIn("Total disk space: ?", s)
2547 self.failUnlessIn("Space Available to Tahoe: ?", s)
2548 self.failUnlessEqual(ss.get_available_space(), 0)
2550 def test_readonly(self):
2551 basedir = "storage/WebStatus/readonly"
2552 fileutil.make_dirs(basedir)
2553 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2554 ss.setServiceParent(self.s)
2555 w = StorageStatus(ss)
2556 html = w.renderSynchronously()
2557 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2558 s = remove_tags(html)
2559 self.failUnlessIn("Accepting new shares: No", s)
2561 def test_reserved(self):
2562 basedir = "storage/WebStatus/reserved"
2563 fileutil.make_dirs(basedir)
2564 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2565 ss.setServiceParent(self.s)
2566 w = StorageStatus(ss)
2567 html = w.renderSynchronously()
2568 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2569 s = remove_tags(html)
2570 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2572 def test_huge_reserved(self):
2573 basedir = "storage/WebStatus/reserved"
2574 fileutil.make_dirs(basedir)
2575 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2576 ss.setServiceParent(self.s)
2577 w = StorageStatus(ss)
2578 html = w.renderSynchronously()
2579 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2580 s = remove_tags(html)
2581 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2583 def test_util(self):
2584 w = StorageStatus(None)
2585 self.failUnlessEqual(w.render_space(None, None), "?")
2586 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2587 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2588 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2589 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2590 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)