1 import time, os.path, platform, stat, re, simplejson, struct
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.interfaces import BadWriteEnablerError
24 from allmydata.test.common import LoggingServiceParent
25 from allmydata.test.common_web import WebRenderingMixin
26 from allmydata.test.no_network import NoNetworkServer
27 from allmydata.web.storage import StorageStatus, remove_prefix
32 def __init__(self, ignore_disconnectors=False):
33 self.ignore = ignore_disconnectors
34 self.disconnectors = {}
35 def notifyOnDisconnect(self, f, *args, **kwargs):
39 self.disconnectors[m] = (f, args, kwargs)
41 def dontNotifyOnDisconnect(self, marker):
44 del self.disconnectors[marker]
46 class FakeStatsProvider:
47 def count(self, name, delta=1):
49 def register_producer(self, producer):
52 class Bucket(unittest.TestCase):
53 def make_workdir(self, name):
54 basedir = os.path.join("storage", "Bucket", name)
55 incoming = os.path.join(basedir, "tmp", "bucket")
56 final = os.path.join(basedir, "bucket")
57 fileutil.make_dirs(basedir)
58 fileutil.make_dirs(os.path.join(basedir, "tmp"))
59 return incoming, final
61 def bucket_writer_closed(self, bw, consumed):
63 def add_latency(self, category, latency):
65 def count(self, name, delta=1):
70 renew_secret = os.urandom(32)
71 cancel_secret = os.urandom(32)
72 expiration_time = time.time() + 5000
73 return LeaseInfo(owner_num, renew_secret, cancel_secret,
74 expiration_time, "\x00" * 20)
76 def test_create(self):
77 incoming, final = self.make_workdir("test_create")
78 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
80 bw.remote_write(0, "a"*25)
81 bw.remote_write(25, "b"*25)
82 bw.remote_write(50, "c"*25)
83 bw.remote_write(75, "d"*7)
86 def test_readwrite(self):
87 incoming, final = self.make_workdir("test_readwrite")
88 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
90 bw.remote_write(0, "a"*25)
91 bw.remote_write(25, "b"*25)
92 bw.remote_write(50, "c"*7) # last block may be short
96 br = BucketReader(self, bw.finalhome)
97 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
98 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
99 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
103 def callRemote(self, methname, *args, **kwargs):
105 meth = getattr(self.target, "remote_" + methname)
106 return meth(*args, **kwargs)
107 return defer.maybeDeferred(_call)
109 class BucketProxy(unittest.TestCase):
110 def make_bucket(self, name, size):
111 basedir = os.path.join("storage", "BucketProxy", name)
112 incoming = os.path.join(basedir, "tmp", "bucket")
113 final = os.path.join(basedir, "bucket")
114 fileutil.make_dirs(basedir)
115 fileutil.make_dirs(os.path.join(basedir, "tmp"))
116 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
122 def make_lease(self):
124 renew_secret = os.urandom(32)
125 cancel_secret = os.urandom(32)
126 expiration_time = time.time() + 5000
127 return LeaseInfo(owner_num, renew_secret, cancel_secret,
128 expiration_time, "\x00" * 20)
130 def bucket_writer_closed(self, bw, consumed):
132 def add_latency(self, category, latency):
134 def count(self, name, delta=1):
137 def test_create(self):
138 bw, rb, sharefname = self.make_bucket("test_create", 500)
139 bp = WriteBucketProxy(rb, None,
144 uri_extension_size_max=500)
145 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
147 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
148 # Let's pretend each share has 100 bytes of data, and that there are
149 # 4 segments (25 bytes each), and 8 shares total. So the two
150 # per-segment merkle trees (crypttext_hash_tree,
151 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
152 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
153 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
154 # long. That should make the whole share:
156 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
157 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
159 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
161 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
163 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
165 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
167 uri_extension = "s" + "E"*498 + "e"
169 bw, rb, sharefname = self.make_bucket(name, sharesize)
170 bp = wbp_class(rb, None,
175 uri_extension_size_max=len(uri_extension))
178 d.addCallback(lambda res: bp.put_block(0, "a"*25))
179 d.addCallback(lambda res: bp.put_block(1, "b"*25))
180 d.addCallback(lambda res: bp.put_block(2, "c"*25))
181 d.addCallback(lambda res: bp.put_block(3, "d"*20))
182 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
183 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
184 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
185 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
186 d.addCallback(lambda res: bp.close())
188 # now read everything back
189 def _start_reading(res):
190 br = BucketReader(self, sharefname)
193 server = NoNetworkServer("abc", None)
194 rbp = rbp_class(rb, server, storage_index="")
195 self.failUnlessIn("to peer", repr(rbp))
196 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
198 d1 = rbp.get_block_data(0, 25, 25)
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
200 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
201 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
202 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
203 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
204 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
205 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
207 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
208 d1.addCallback(lambda res:
209 self.failUnlessEqual(res, crypttext_hashes))
210 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
211 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
212 d1.addCallback(lambda res: rbp.get_share_hashes())
213 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
214 d1.addCallback(lambda res: rbp.get_uri_extension())
215 d1.addCallback(lambda res:
216 self.failUnlessEqual(res, uri_extension))
220 d.addCallback(_start_reading)
224 def test_readwrite_v1(self):
225 return self._do_test_readwrite("test_readwrite_v1",
226 0x24, WriteBucketProxy, ReadBucketProxy)
228 def test_readwrite_v2(self):
229 return self._do_test_readwrite("test_readwrite_v2",
230 0x44, WriteBucketProxy_v2, ReadBucketProxy)
232 class Server(unittest.TestCase):
235 self.sparent = LoggingServiceParent()
236 self.sparent.startService()
237 self._lease_secret = itertools.count()
239 return self.sparent.stopService()
241 def workdir(self, name):
242 basedir = os.path.join("storage", "Server", name)
245 def create(self, name, reserved_space=0, klass=StorageServer):
246 workdir = self.workdir(name)
247 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
248 stats_provider=FakeStatsProvider())
249 ss.setServiceParent(self.sparent)
252 def test_create(self):
253 self.create("test_create")
255 def allocate(self, ss, storage_index, sharenums, size, canary=None):
256 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
257 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259 canary = FakeCanary()
260 return ss.remote_allocate_buckets(storage_index,
261 renew_secret, cancel_secret,
262 sharenums, size, canary)
264 def test_large_share(self):
265 syslow = platform.system().lower()
266 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
267 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
269 avail = fileutil.get_available_space('.', 512*2**20)
271 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
273 ss = self.create("test_large_share")
275 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
276 self.failUnlessEqual(already, set())
277 self.failUnlessEqual(set(writers.keys()), set([0]))
279 shnum, bucket = writers.items()[0]
280 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
281 bucket.remote_write(2**32, "ab")
282 bucket.remote_close()
284 readers = ss.remote_get_buckets("allocate")
285 reader = readers[shnum]
286 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
288 def test_dont_overfill_dirs(self):
290 This test asserts that if you add a second share whose storage index
291 share lots of leading bits with an extant share (but isn't the exact
292 same storage index), this won't add an entry to the share directory.
294 ss = self.create("test_dont_overfill_dirs")
295 already, writers = self.allocate(ss, "storageindex", [0], 10)
296 for i, wb in writers.items():
297 wb.remote_write(0, "%10d" % i)
299 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
301 children_of_storedir = set(os.listdir(storedir))
303 # Now store another one under another storageindex that has leading
304 # chars the same as the first storageindex.
305 already, writers = self.allocate(ss, "storageindey", [0], 10)
306 for i, wb in writers.items():
307 wb.remote_write(0, "%10d" % i)
309 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
311 new_children_of_storedir = set(os.listdir(storedir))
312 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
314 def test_remove_incoming(self):
315 ss = self.create("test_remove_incoming")
316 already, writers = self.allocate(ss, "vid", range(3), 10)
317 for i,wb in writers.items():
318 wb.remote_write(0, "%10d" % i)
320 incoming_share_dir = wb.incominghome
321 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
322 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
323 incoming_dir = os.path.dirname(incoming_prefix_dir)
324 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
325 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
326 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
328 def test_abort(self):
329 # remote_abort, when called on a writer, should make sure that
330 # the allocated size of the bucket is not counted by the storage
331 # server when accounting for space.
332 ss = self.create("test_abort")
333 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
334 self.failIfEqual(ss.allocated_size(), 0)
336 # Now abort the writers.
337 for writer in writers.itervalues():
338 writer.remote_abort()
339 self.failUnlessEqual(ss.allocated_size(), 0)
342 def test_allocate(self):
343 ss = self.create("test_allocate")
345 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
347 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
348 self.failUnlessEqual(already, set())
349 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
351 # while the buckets are open, they should not count as readable
352 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
355 for i,wb in writers.items():
356 wb.remote_write(0, "%25d" % i)
358 # aborting a bucket that was already closed is a no-op
361 # now they should be readable
362 b = ss.remote_get_buckets("allocate")
363 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
364 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
366 self.failUnlessIn("BucketReader", b_str)
367 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
369 # now if we ask about writing again, the server should offer those
370 # three buckets as already present. It should offer them even if we
371 # don't ask about those specific ones.
372 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
373 self.failUnlessEqual(already, set([0,1,2]))
374 self.failUnlessEqual(set(writers.keys()), set([3,4]))
376 # while those two buckets are open for writing, the server should
377 # refuse to offer them to uploaders
379 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
380 self.failUnlessEqual(already2, set([0,1,2]))
381 self.failUnlessEqual(set(writers2.keys()), set([5]))
383 # aborting the writes should remove the tempfiles
384 for i,wb in writers2.items():
386 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
387 self.failUnlessEqual(already2, set([0,1,2]))
388 self.failUnlessEqual(set(writers2.keys()), set([5]))
390 for i,wb in writers2.items():
392 for i,wb in writers.items():
395 def test_bad_container_version(self):
396 ss = self.create("test_bad_container_version")
397 a,w = self.allocate(ss, "si1", [0], 10)
398 w[0].remote_write(0, "\xff"*10)
401 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
404 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
407 ss.remote_get_buckets("allocate")
409 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
410 ss.remote_get_buckets, "si1")
411 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
413 def test_disconnect(self):
414 # simulate a disconnection
415 ss = self.create("test_disconnect")
416 canary = FakeCanary()
417 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
418 self.failUnlessEqual(already, set())
419 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
420 for (f,args,kwargs) in canary.disconnectors.values():
425 # that ought to delete the incoming shares
426 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
427 self.failUnlessEqual(already, set())
428 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
430 @mock.patch('allmydata.util.fileutil.get_disk_stats')
431 def test_reserved_space(self, mock_get_disk_stats):
433 mock_get_disk_stats.return_value = {
434 'free_for_nonroot': 15000,
435 'avail': max(15000 - reserved_space, 0),
438 ss = self.create("test_reserved_space", reserved_space=reserved_space)
439 # 15k available, 10k reserved, leaves 5k for shares
441 # a newly created and filled share incurs this much overhead, beyond
442 # the size we request.
444 LEASE_SIZE = 4+32+32+4
445 canary = FakeCanary(True)
446 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
447 self.failUnlessEqual(len(writers), 3)
448 # now the StorageServer should have 3000 bytes provisionally
449 # allocated, allowing only 2000 more to be claimed
450 self.failUnlessEqual(len(ss._active_writers), 3)
452 # allocating 1001-byte shares only leaves room for one
453 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
454 self.failUnlessEqual(len(writers2), 1)
455 self.failUnlessEqual(len(ss._active_writers), 4)
457 # we abandon the first set, so their provisional allocation should be
461 self.failUnlessEqual(len(ss._active_writers), 1)
462 # now we have a provisional allocation of 1001 bytes
464 # and we close the second set, so their provisional allocation should
465 # become real, long-term allocation, and grows to include the
467 for bw in writers2.values():
468 bw.remote_write(0, "a"*25)
473 self.failUnlessEqual(len(ss._active_writers), 0)
475 allocated = 1001 + OVERHEAD + LEASE_SIZE
477 # we have to manually increase available, since we're not doing real
479 mock_get_disk_stats.return_value = {
480 'free_for_nonroot': 15000 - allocated,
481 'avail': max(15000 - allocated - reserved_space, 0),
484 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
485 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
486 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
487 self.failUnlessEqual(len(writers3), 39)
488 self.failUnlessEqual(len(ss._active_writers), 39)
492 self.failUnlessEqual(len(ss._active_writers), 0)
493 ss.disownServiceParent()
497 basedir = self.workdir("test_seek_behavior")
498 fileutil.make_dirs(basedir)
499 filename = os.path.join(basedir, "testfile")
500 f = open(filename, "wb")
503 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
504 # files. mode="a" preserves previous contents but does not allow
505 # seeking-to-create-holes. mode="r+" allows both.
506 f = open(filename, "rb+")
510 filelen = os.stat(filename)[stat.ST_SIZE]
511 self.failUnlessEqual(filelen, 100+3)
512 f2 = open(filename, "rb")
513 self.failUnlessEqual(f2.read(5), "start")
516 def test_leases(self):
517 ss = self.create("test_leases")
518 canary = FakeCanary()
522 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
523 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
524 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
525 sharenums, size, canary)
526 self.failUnlessEqual(len(already), 0)
527 self.failUnlessEqual(len(writers), 5)
528 for wb in writers.values():
531 leases = list(ss.get_leases("si0"))
532 self.failUnlessEqual(len(leases), 1)
533 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
535 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
538 sharenums, size, canary)
539 for wb in writers.values():
542 # take out a second lease on si1
543 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
544 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
545 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
546 sharenums, size, canary)
547 self.failUnlessEqual(len(already), 5)
548 self.failUnlessEqual(len(writers), 0)
550 leases = list(ss.get_leases("si1"))
551 self.failUnlessEqual(len(leases), 2)
552 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
554 # and a third lease, using add-lease
555 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
556 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
557 ss.remote_add_lease("si1", rs2a, cs2a)
558 leases = list(ss.get_leases("si1"))
559 self.failUnlessEqual(len(leases), 3)
560 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
562 # add-lease on a missing storage index is silently ignored
563 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
565 # check that si0 is readable
566 readers = ss.remote_get_buckets("si0")
567 self.failUnlessEqual(len(readers), 5)
569 # renew the first lease. Only the proper renew_secret should work
570 ss.remote_renew_lease("si0", rs0)
571 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
572 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
574 # check that si0 is still readable
575 readers = ss.remote_get_buckets("si0")
576 self.failUnlessEqual(len(readers), 5)
579 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
580 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
581 ss.remote_cancel_lease("si0", cs0)
583 # si0 should now be gone
584 readers = ss.remote_get_buckets("si0")
585 self.failUnlessEqual(len(readers), 0)
586 # and the renew should no longer work
587 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
590 # cancel the first lease on si1, leaving the second and third in place
591 ss.remote_cancel_lease("si1", cs1)
592 readers = ss.remote_get_buckets("si1")
593 self.failUnlessEqual(len(readers), 5)
594 # the corresponding renew should no longer work
595 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
597 leases = list(ss.get_leases("si1"))
598 self.failUnlessEqual(len(leases), 2)
599 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
601 ss.remote_renew_lease("si1", rs2)
602 # cancelling the second and third should make it go away
603 ss.remote_cancel_lease("si1", cs2)
604 ss.remote_cancel_lease("si1", cs2a)
605 readers = ss.remote_get_buckets("si1")
606 self.failUnlessEqual(len(readers), 0)
607 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
608 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
609 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
611 leases = list(ss.get_leases("si1"))
612 self.failUnlessEqual(len(leases), 0)
615 # test overlapping uploads
616 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
617 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
618 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
619 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
620 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
621 sharenums, size, canary)
622 self.failUnlessEqual(len(already), 0)
623 self.failUnlessEqual(len(writers), 5)
624 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
625 sharenums, size, canary)
626 self.failUnlessEqual(len(already2), 0)
627 self.failUnlessEqual(len(writers2), 0)
628 for wb in writers.values():
631 leases = list(ss.get_leases("si3"))
632 self.failUnlessEqual(len(leases), 1)
634 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
635 sharenums, size, canary)
636 self.failUnlessEqual(len(already3), 5)
637 self.failUnlessEqual(len(writers3), 0)
639 leases = list(ss.get_leases("si3"))
640 self.failUnlessEqual(len(leases), 2)
642 def test_readonly(self):
643 workdir = self.workdir("test_readonly")
644 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
645 ss.setServiceParent(self.sparent)
647 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
648 self.failUnlessEqual(already, set())
649 self.failUnlessEqual(writers, {})
651 stats = ss.get_stats()
652 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
653 if "storage_server.disk_avail" in stats:
654 # Some platforms may not have an API to get disk stats.
655 # But if there are stats, readonly_storage means disk_avail=0
656 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
658 def test_discard(self):
659 # discard is really only used for other tests, but we test it anyways
660 workdir = self.workdir("test_discard")
661 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
662 ss.setServiceParent(self.sparent)
664 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
665 self.failUnlessEqual(already, set())
666 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
667 for i,wb in writers.items():
668 wb.remote_write(0, "%25d" % i)
670 # since we discard the data, the shares should be present but sparse.
671 # Since we write with some seeks, the data we read back will be all
673 b = ss.remote_get_buckets("vid")
674 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
675 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
677 def test_advise_corruption(self):
678 workdir = self.workdir("test_advise_corruption")
679 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
680 ss.setServiceParent(self.sparent)
682 si0_s = base32.b2a("si0")
683 ss.remote_advise_corrupt_share("immutable", "si0", 0,
684 "This share smells funny.\n")
685 reportdir = os.path.join(workdir, "corruption-advisories")
686 reports = os.listdir(reportdir)
687 self.failUnlessEqual(len(reports), 1)
688 report_si0 = reports[0]
689 self.failUnlessIn(si0_s, report_si0)
690 f = open(os.path.join(reportdir, report_si0), "r")
693 self.failUnlessIn("type: immutable", report)
694 self.failUnlessIn("storage_index: %s" % si0_s, report)
695 self.failUnlessIn("share_number: 0", report)
696 self.failUnlessIn("This share smells funny.", report)
698 # test the RIBucketWriter version too
699 si1_s = base32.b2a("si1")
700 already,writers = self.allocate(ss, "si1", [1], 75)
701 self.failUnlessEqual(already, set())
702 self.failUnlessEqual(set(writers.keys()), set([1]))
703 writers[1].remote_write(0, "data")
704 writers[1].remote_close()
706 b = ss.remote_get_buckets("si1")
707 self.failUnlessEqual(set(b.keys()), set([1]))
708 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
710 reports = os.listdir(reportdir)
711 self.failUnlessEqual(len(reports), 2)
712 report_si1 = [r for r in reports if si1_s in r][0]
713 f = open(os.path.join(reportdir, report_si1), "r")
716 self.failUnlessIn("type: immutable", report)
717 self.failUnlessIn("storage_index: %s" % si1_s, report)
718 self.failUnlessIn("share_number: 1", report)
719 self.failUnlessIn("This share tastes like dust.", report)
723 class MutableServer(unittest.TestCase):
726 self.sparent = LoggingServiceParent()
727 self._lease_secret = itertools.count()
729 return self.sparent.stopService()
731 def workdir(self, name):
732 basedir = os.path.join("storage", "MutableServer", name)
735 def create(self, name):
736 workdir = self.workdir(name)
737 ss = StorageServer(workdir, "\x00" * 20)
738 ss.setServiceParent(self.sparent)
741 def test_create(self):
742 self.create("test_create")
744 def write_enabler(self, we_tag):
745 return hashutil.tagged_hash("we_blah", we_tag)
747 def renew_secret(self, tag):
748 return hashutil.tagged_hash("renew_blah", str(tag))
750 def cancel_secret(self, tag):
751 return hashutil.tagged_hash("cancel_blah", str(tag))
753 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
754 write_enabler = self.write_enabler(we_tag)
755 renew_secret = self.renew_secret(lease_tag)
756 cancel_secret = self.cancel_secret(lease_tag)
757 rstaraw = ss.remote_slot_testv_and_readv_and_writev
758 testandwritev = dict( [ (shnum, ([], [], None) )
759 for shnum in sharenums ] )
761 rc = rstaraw(storage_index,
762 (write_enabler, renew_secret, cancel_secret),
765 (did_write, readv_data) = rc
766 self.failUnless(did_write)
767 self.failUnless(isinstance(readv_data, dict))
768 self.failUnlessEqual(len(readv_data), 0)
770 def test_bad_magic(self):
771 ss = self.create("test_bad_magic")
772 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
773 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
778 read = ss.remote_slot_readv
779 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
780 read, "si1", [0], [(0,10)])
781 self.failUnlessIn(" had magic ", str(e))
782 self.failUnlessIn(" but we wanted ", str(e))
784 def test_container_size(self):
785 ss = self.create("test_container_size")
786 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
788 read = ss.remote_slot_readv
789 rstaraw = ss.remote_slot_testv_and_readv_and_writev
790 secrets = ( self.write_enabler("we1"),
791 self.renew_secret("we1"),
792 self.cancel_secret("we1") )
793 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
794 answer = rstaraw("si1", secrets,
795 {0: ([], [(0,data)], len(data)+12)},
797 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
799 # trying to make the container too large will raise an exception
800 TOOBIG = MutableShareFile.MAX_SIZE + 10
801 self.failUnlessRaises(DataTooLargeError,
802 rstaraw, "si1", secrets,
803 {0: ([], [(0,data)], TOOBIG)},
806 # it should be possible to make the container smaller, although at
807 # the moment this doesn't actually affect the share, unless the
808 # container size is dropped to zero, in which case the share is
810 answer = rstaraw("si1", secrets,
811 {0: ([], [(0,data)], len(data)+8)},
813 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
815 answer = rstaraw("si1", secrets,
816 {0: ([], [(0,data)], 0)},
818 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
820 read_answer = read("si1", [0], [(0,10)])
821 self.failUnlessEqual(read_answer, {})
823 def test_allocate(self):
824 ss = self.create("test_allocate")
825 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
828 read = ss.remote_slot_readv
829 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
831 self.failUnlessEqual(read("si1", [], [(0, 10)]),
832 {0: [""], 1: [""], 2: [""]})
833 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
837 secrets = ( self.write_enabler("we1"),
838 self.renew_secret("we1"),
839 self.cancel_secret("we1") )
840 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
841 write = ss.remote_slot_testv_and_readv_and_writev
842 answer = write("si1", secrets,
843 {0: ([], [(0,data)], None)},
845 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
847 self.failUnlessEqual(read("si1", [0], [(0,20)]),
848 {0: ["00000000001111111111"]})
849 self.failUnlessEqual(read("si1", [0], [(95,10)]),
851 #self.failUnlessEqual(s0.remote_get_length(), 100)
853 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
854 f = self.failUnlessRaises(BadWriteEnablerError,
855 write, "si1", bad_secrets,
857 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
859 # this testv should fail
860 answer = write("si1", secrets,
861 {0: ([(0, 12, "eq", "444444444444"),
862 (20, 5, "eq", "22222"),
869 self.failUnlessEqual(answer, (False,
870 {0: ["000000000011", "22222"],
874 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
877 answer = write("si1", secrets,
878 {0: ([(10, 5, "lt", "11111"),
885 self.failUnlessEqual(answer, (False,
890 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
893 def test_operators(self):
894 # test operators, the data we're comparing is '11111' in all cases.
895 # test both fail+pass, reset data after each one.
896 ss = self.create("test_operators")
898 secrets = ( self.write_enabler("we1"),
899 self.renew_secret("we1"),
900 self.cancel_secret("we1") )
901 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
902 write = ss.remote_slot_testv_and_readv_and_writev
903 read = ss.remote_slot_readv
906 write("si1", secrets,
907 {0: ([], [(0,data)], None)},
913 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
918 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
919 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
920 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
923 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
928 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
929 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
932 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
937 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
938 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
942 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
947 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
948 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
951 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
956 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
957 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
960 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
965 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
966 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
970 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
975 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
976 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
979 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
984 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
985 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
989 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
994 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
995 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
998 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1003 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1004 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1008 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1013 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1014 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1017 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1022 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1023 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1026 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1031 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1032 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1036 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1041 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1042 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1045 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1050 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1051 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1054 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1059 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1060 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1063 # finally, test some operators against empty shares
1064 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1069 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1070 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1073 def test_readv(self):
1074 ss = self.create("test_readv")
1075 secrets = ( self.write_enabler("we1"),
1076 self.renew_secret("we1"),
1077 self.cancel_secret("we1") )
1078 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1079 write = ss.remote_slot_testv_and_readv_and_writev
1080 read = ss.remote_slot_readv
1081 data = [("%d" % i) * 100 for i in range(3)]
1082 rc = write("si1", secrets,
1083 {0: ([], [(0,data[0])], None),
1084 1: ([], [(0,data[1])], None),
1085 2: ([], [(0,data[2])], None),
1087 self.failUnlessEqual(rc, (True, {}))
1089 answer = read("si1", [], [(0, 10)])
1090 self.failUnlessEqual(answer, {0: ["0"*10],
1094 def compare_leases_without_timestamps(self, leases_a, leases_b):
1095 self.failUnlessEqual(len(leases_a), len(leases_b))
1096 for i in range(len(leases_a)):
1099 self.failUnlessEqual(a.owner_num, b.owner_num)
1100 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1101 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1102 self.failUnlessEqual(a.nodeid, b.nodeid)
1104 def compare_leases(self, leases_a, leases_b):
1105 self.failUnlessEqual(len(leases_a), len(leases_b))
1106 for i in range(len(leases_a)):
1109 self.failUnlessEqual(a.owner_num, b.owner_num)
1110 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1111 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1112 self.failUnlessEqual(a.nodeid, b.nodeid)
1113 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1115 def test_leases(self):
1116 ss = self.create("test_leases")
1118 return ( self.write_enabler("we1"),
1119 self.renew_secret("we1-%d" % n),
1120 self.cancel_secret("we1-%d" % n) )
1121 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1122 write = ss.remote_slot_testv_and_readv_and_writev
1123 read = ss.remote_slot_readv
1124 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1125 self.failUnlessEqual(rc, (True, {}))
1127 # create a random non-numeric file in the bucket directory, to
1128 # exercise the code that's supposed to ignore those.
1129 bucket_dir = os.path.join(self.workdir("test_leases"),
1130 "shares", storage_index_to_dir("si1"))
1131 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1132 f.write("you ought to be ignoring me\n")
1135 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1136 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1138 # add-lease on a missing storage index is silently ignored
1139 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1141 # re-allocate the slots and use the same secrets, that should update
1143 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1144 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1147 ss.remote_renew_lease("si1", secrets(0)[1])
1148 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1150 # now allocate them with a bunch of different secrets, to trigger the
1151 # extended lease code. Use add_lease for one of them.
1152 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1153 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1154 secrets2 = secrets(2)
1155 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1156 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1157 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1158 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1159 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1161 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1163 # cancel one of them
1164 ss.remote_cancel_lease("si1", secrets(5)[2])
1165 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1167 all_leases = list(s0.get_leases())
1168 # and write enough data to expand the container, forcing the server
1169 # to move the leases
1170 write("si1", secrets(0),
1171 {0: ([], [(0,data)], 200), },
1174 # read back the leases, make sure they're still intact.
1175 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1177 ss.remote_renew_lease("si1", secrets(0)[1])
1178 ss.remote_renew_lease("si1", secrets(1)[1])
1179 ss.remote_renew_lease("si1", secrets(2)[1])
1180 ss.remote_renew_lease("si1", secrets(3)[1])
1181 ss.remote_renew_lease("si1", secrets(4)[1])
1182 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1183 # get a new copy of the leases, with the current timestamps. Reading
1184 # data and failing to renew/cancel leases should leave the timestamps
1186 all_leases = list(s0.get_leases())
1187 # renewing with a bogus token should prompt an error message
1189 # examine the exception thus raised, make sure the old nodeid is
1190 # present, to provide for share migration
1191 e = self.failUnlessRaises(IndexError,
1192 ss.remote_renew_lease, "si1",
1195 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1196 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1197 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1199 # same for cancelling
1200 self.failUnlessRaises(IndexError,
1201 ss.remote_cancel_lease, "si1",
1203 self.compare_leases(all_leases, list(s0.get_leases()))
1205 # reading shares should not modify the timestamp
1206 read("si1", [], [(0,200)])
1207 self.compare_leases(all_leases, list(s0.get_leases()))
1209 write("si1", secrets(0),
1210 {0: ([], [(200, "make me bigger")], None)}, [])
1211 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1213 write("si1", secrets(0),
1214 {0: ([], [(500, "make me really bigger")], None)}, [])
1215 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1217 # now cancel them all
1218 ss.remote_cancel_lease("si1", secrets(0)[2])
1219 ss.remote_cancel_lease("si1", secrets(1)[2])
1220 ss.remote_cancel_lease("si1", secrets(2)[2])
1221 ss.remote_cancel_lease("si1", secrets(3)[2])
1223 # the slot should still be there
1224 remaining_shares = read("si1", [], [(0,10)])
1225 self.failUnlessEqual(len(remaining_shares), 1)
1226 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1228 # cancelling a non-existent lease should raise an IndexError
1229 self.failUnlessRaises(IndexError,
1230 ss.remote_cancel_lease, "si1", "nonsecret")
1232 # and the slot should still be there
1233 remaining_shares = read("si1", [], [(0,10)])
1234 self.failUnlessEqual(len(remaining_shares), 1)
1235 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1237 ss.remote_cancel_lease("si1", secrets(4)[2])
1238 # now the slot should be gone
1239 no_shares = read("si1", [], [(0,10)])
1240 self.failUnlessEqual(no_shares, {})
1242 # cancelling a lease on a non-existent share should raise an IndexError
1243 self.failUnlessRaises(IndexError,
1244 ss.remote_cancel_lease, "si2", "nonsecret")
1246 def test_remove(self):
1247 ss = self.create("test_remove")
1248 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1250 readv = ss.remote_slot_readv
1251 writev = ss.remote_slot_testv_and_readv_and_writev
1252 secrets = ( self.write_enabler("we1"),
1253 self.renew_secret("we1"),
1254 self.cancel_secret("we1") )
1255 # delete sh0 by setting its size to zero
1256 answer = writev("si1", secrets,
1259 # the answer should mention all the shares that existed before the
1261 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1262 # but a new read should show only sh1 and sh2
1263 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1266 # delete sh1 by setting its size to zero
1267 answer = writev("si1", secrets,
1270 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1271 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1274 # delete sh2 by setting its size to zero
1275 answer = writev("si1", secrets,
1278 self.failUnlessEqual(answer, (True, {2:[]}) )
1279 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1281 # and the bucket directory should now be gone
1282 si = base32.b2a("si1")
1283 # note: this is a detail of the storage server implementation, and
1284 # may change in the future
1286 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1287 bucketdir = os.path.join(prefixdir, si)
1288 self.failUnless(os.path.exists(prefixdir), prefixdir)
1289 self.failIf(os.path.exists(bucketdir), bucketdir)
1291 class Stats(unittest.TestCase):
1294 self.sparent = LoggingServiceParent()
1295 self._lease_secret = itertools.count()
1297 return self.sparent.stopService()
1299 def workdir(self, name):
1300 basedir = os.path.join("storage", "Server", name)
1303 def create(self, name):
1304 workdir = self.workdir(name)
1305 ss = StorageServer(workdir, "\x00" * 20)
1306 ss.setServiceParent(self.sparent)
1309 def test_latencies(self):
1310 ss = self.create("test_latencies")
1311 for i in range(10000):
1312 ss.add_latency("allocate", 1.0 * i)
1313 for i in range(1000):
1314 ss.add_latency("renew", 1.0 * i)
1316 ss.add_latency("write", 1.0 * i)
1318 ss.add_latency("cancel", 2.0 * i)
1319 ss.add_latency("get", 5.0)
1321 output = ss.get_latencies()
1323 self.failUnlessEqual(sorted(output.keys()),
1324 sorted(["allocate", "renew", "cancel", "write", "get"]))
1325 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1326 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1327 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1328 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1329 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1330 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1331 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1332 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1333 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1335 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1336 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1337 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
1338 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1339 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1340 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1341 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1342 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1343 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1345 self.failUnlessEqual(len(ss.latencies["write"]), 20)
1346 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
1347 self.failUnless(output["write"]["01_0_percentile"] is None, output)
1348 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
1349 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
1350 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
1351 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
1352 self.failUnless(output["write"]["99_0_percentile"] is None, output)
1353 self.failUnless(output["write"]["99_9_percentile"] is None, output)
1355 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1356 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1357 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
1358 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
1359 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1360 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1361 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
1362 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
1363 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
1365 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1366 self.failUnless(output["get"]["mean"] is None, output)
1367 self.failUnless(output["get"]["01_0_percentile"] is None, output)
1368 self.failUnless(output["get"]["10_0_percentile"] is None, output)
1369 self.failUnless(output["get"]["50_0_percentile"] is None, output)
1370 self.failUnless(output["get"]["90_0_percentile"] is None, output)
1371 self.failUnless(output["get"]["95_0_percentile"] is None, output)
1372 self.failUnless(output["get"]["99_0_percentile"] is None, output)
1373 self.failUnless(output["get"]["99_9_percentile"] is None, output)
1376 s = re.sub(r'<[^>]*>', ' ', s)
1377 s = re.sub(r'\s+', ' ', s)
1380 class MyBucketCountingCrawler(BucketCountingCrawler):
1381 def finished_prefix(self, cycle, prefix):
1382 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1384 d = self.hook_ds.pop(0)
1387 class MyStorageServer(StorageServer):
1388 def add_bucket_counter(self):
1389 statefile = os.path.join(self.storedir, "bucket_counter.state")
1390 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1391 self.bucket_counter.setServiceParent(self)
1393 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1396 self.s = service.MultiService()
1397 self.s.startService()
1399 return self.s.stopService()
1401 def test_bucket_counter(self):
1402 basedir = "storage/BucketCounter/bucket_counter"
1403 fileutil.make_dirs(basedir)
1404 ss = StorageServer(basedir, "\x00" * 20)
1405 # to make sure we capture the bucket-counting-crawler in the middle
1406 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1407 # also make it start sooner than usual.
1408 ss.bucket_counter.slow_start = 0
1409 orig_cpu_slice = ss.bucket_counter.cpu_slice
1410 ss.bucket_counter.cpu_slice = 0
1411 ss.setServiceParent(self.s)
1413 w = StorageStatus(ss)
1415 # this sample is before the crawler has started doing anything
1416 html = w.renderSynchronously()
1417 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1418 s = remove_tags(html)
1419 self.failUnlessIn("Accepting new shares: Yes", s)
1420 self.failUnlessIn("Reserved space: - 0 B (0)", s)
1421 self.failUnlessIn("Total buckets: Not computed yet", s)
1422 self.failUnlessIn("Next crawl in", s)
1424 # give the bucket-counting-crawler one tick to get started. The
1425 # cpu_slice=0 will force it to yield right after it processes the
1428 d = fireEventually()
1429 def _check(ignored):
1430 # are we really right after the first prefix?
1431 state = ss.bucket_counter.get_state()
1432 if state["last-complete-prefix"] is None:
1433 d2 = fireEventually()
1434 d2.addCallback(_check)
1436 self.failUnlessEqual(state["last-complete-prefix"],
1437 ss.bucket_counter.prefixes[0])
1438 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1439 html = w.renderSynchronously()
1440 s = remove_tags(html)
1441 self.failUnlessIn(" Current crawl ", s)
1442 self.failUnlessIn(" (next work in ", s)
1443 d.addCallback(_check)
1445 # now give it enough time to complete a full cycle
1447 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1448 d.addCallback(lambda ignored: self.poll(_watch))
1449 def _check2(ignored):
1450 ss.bucket_counter.cpu_slice = orig_cpu_slice
1451 html = w.renderSynchronously()
1452 s = remove_tags(html)
1453 self.failUnlessIn("Total buckets: 0 (the number of", s)
1454 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1455 d.addCallback(_check2)
1458 def test_bucket_counter_cleanup(self):
1459 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1460 fileutil.make_dirs(basedir)
1461 ss = StorageServer(basedir, "\x00" * 20)
1462 # to make sure we capture the bucket-counting-crawler in the middle
1463 # of a cycle, we reach in and reduce its maximum slice time to 0.
1464 ss.bucket_counter.slow_start = 0
1465 orig_cpu_slice = ss.bucket_counter.cpu_slice
1466 ss.bucket_counter.cpu_slice = 0
1467 ss.setServiceParent(self.s)
1469 d = fireEventually()
1471 def _after_first_prefix(ignored):
1472 state = ss.bucket_counter.state
1473 if state["last-complete-prefix"] is None:
1474 d2 = fireEventually()
1475 d2.addCallback(_after_first_prefix)
1477 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1478 # now sneak in and mess with its state, to make sure it cleans up
1479 # properly at the end of the cycle
1480 self.failUnlessEqual(state["last-complete-prefix"],
1481 ss.bucket_counter.prefixes[0])
1482 state["bucket-counts"][-12] = {}
1483 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1484 ss.bucket_counter.save_state()
1485 d.addCallback(_after_first_prefix)
1487 # now give it enough time to complete a cycle
1489 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1490 d.addCallback(lambda ignored: self.poll(_watch))
1491 def _check2(ignored):
1492 ss.bucket_counter.cpu_slice = orig_cpu_slice
1493 s = ss.bucket_counter.get_state()
1494 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1495 self.failIf("bogusprefix!" in s["storage-index-samples"],
1496 s["storage-index-samples"].keys())
1497 d.addCallback(_check2)
1500 def test_bucket_counter_eta(self):
1501 basedir = "storage/BucketCounter/bucket_counter_eta"
1502 fileutil.make_dirs(basedir)
1503 ss = MyStorageServer(basedir, "\x00" * 20)
1504 ss.bucket_counter.slow_start = 0
1505 # these will be fired inside finished_prefix()
1506 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1507 w = StorageStatus(ss)
1509 d = defer.Deferred()
1511 def _check_1(ignored):
1512 # no ETA is available yet
1513 html = w.renderSynchronously()
1514 s = remove_tags(html)
1515 self.failUnlessIn("complete (next work", s)
1517 def _check_2(ignored):
1518 # one prefix has finished, so an ETA based upon that elapsed time
1519 # should be available.
1520 html = w.renderSynchronously()
1521 s = remove_tags(html)
1522 self.failUnlessIn("complete (ETA ", s)
1524 def _check_3(ignored):
1525 # two prefixes have finished
1526 html = w.renderSynchronously()
1527 s = remove_tags(html)
1528 self.failUnlessIn("complete (ETA ", s)
1531 hooks[0].addCallback(_check_1).addErrback(d.errback)
1532 hooks[1].addCallback(_check_2).addErrback(d.errback)
1533 hooks[2].addCallback(_check_3).addErrback(d.errback)
1535 ss.setServiceParent(self.s)
1538 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1539 stop_after_first_bucket = False
1540 def process_bucket(self, *args, **kwargs):
1541 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1542 if self.stop_after_first_bucket:
1543 self.stop_after_first_bucket = False
1544 self.cpu_slice = -1.0
1545 def yielding(self, sleep_time):
1546 if not self.stop_after_first_bucket:
1547 self.cpu_slice = 500
1549 class BrokenStatResults:
1551 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1554 bsr = BrokenStatResults()
1555 for attrname in dir(s):
1556 if attrname.startswith("_"):
1558 if attrname == "st_blocks":
1560 setattr(bsr, attrname, getattr(s, attrname))
1563 class InstrumentedStorageServer(StorageServer):
1564 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1565 class No_ST_BLOCKS_StorageServer(StorageServer):
1566 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1568 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1571 self.s = service.MultiService()
1572 self.s.startService()
1574 return self.s.stopService()
1576 def make_shares(self, ss):
1578 return (si, hashutil.tagged_hash("renew", si),
1579 hashutil.tagged_hash("cancel", si))
1580 def make_mutable(si):
1581 return (si, hashutil.tagged_hash("renew", si),
1582 hashutil.tagged_hash("cancel", si),
1583 hashutil.tagged_hash("write-enabler", si))
1584 def make_extra_lease(si, num):
1585 return (hashutil.tagged_hash("renew-%d" % num, si),
1586 hashutil.tagged_hash("cancel-%d" % num, si))
1588 immutable_si_0, rs0, cs0 = make("\x00" * 16)
1589 immutable_si_1, rs1, cs1 = make("\x01" * 16)
1590 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1591 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1592 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1593 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1595 canary = FakeCanary()
1596 # note: 'tahoe debug dump-share' will not handle this file, since the
1597 # inner contents are not a valid CHK share
1598 data = "\xff" * 1000
1600 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1602 w[0].remote_write(0, data)
1605 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1607 w[0].remote_write(0, data)
1609 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1611 writev = ss.remote_slot_testv_and_readv_and_writev
1612 writev(mutable_si_2, (we2, rs2, cs2),
1613 {0: ([], [(0,data)], len(data))}, [])
1614 writev(mutable_si_3, (we3, rs3, cs3),
1615 {0: ([], [(0,data)], len(data))}, [])
1616 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1618 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1619 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1620 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1622 def test_basic(self):
1623 basedir = "storage/LeaseCrawler/basic"
1624 fileutil.make_dirs(basedir)
1625 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1626 # make it start sooner than usual.
1627 lc = ss.lease_checker
1630 lc.stop_after_first_bucket = True
1631 webstatus = StorageStatus(ss)
1633 # create a few shares, with some leases on them
1634 self.make_shares(ss)
1635 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1637 # add a non-sharefile to exercise another code path
1638 fn = os.path.join(ss.sharedir,
1639 storage_index_to_dir(immutable_si_0),
1642 f.write("I am not a share.\n")
1645 # this is before the crawl has started, so we're not in a cycle yet
1646 initial_state = lc.get_state()
1647 self.failIf(lc.get_progress()["cycle-in-progress"])
1648 self.failIfIn("cycle-to-date", initial_state)
1649 self.failIfIn("estimated-remaining-cycle", initial_state)
1650 self.failIfIn("estimated-current-cycle", initial_state)
1651 self.failUnlessIn("history", initial_state)
1652 self.failUnlessEqual(initial_state["history"], {})
1654 ss.setServiceParent(self.s)
1658 d = fireEventually()
1660 # now examine the state right after the first bucket has been
1662 def _after_first_bucket(ignored):
1663 initial_state = lc.get_state()
1664 if "cycle-to-date" not in initial_state:
1665 d2 = fireEventually()
1666 d2.addCallback(_after_first_bucket)
1668 self.failUnlessIn("cycle-to-date", initial_state)
1669 self.failUnlessIn("estimated-remaining-cycle", initial_state)
1670 self.failUnlessIn("estimated-current-cycle", initial_state)
1671 self.failUnlessIn("history", initial_state)
1672 self.failUnlessEqual(initial_state["history"], {})
1674 so_far = initial_state["cycle-to-date"]
1675 self.failUnlessEqual(so_far["expiration-enabled"], False)
1676 self.failUnlessIn("configured-expiration-mode", so_far)
1677 self.failUnlessIn("lease-age-histogram", so_far)
1678 lah = so_far["lease-age-histogram"]
1679 self.failUnlessEqual(type(lah), list)
1680 self.failUnlessEqual(len(lah), 1)
1681 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1682 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1683 self.failUnlessEqual(so_far["corrupt-shares"], [])
1684 sr1 = so_far["space-recovered"]
1685 self.failUnlessEqual(sr1["examined-buckets"], 1)
1686 self.failUnlessEqual(sr1["examined-shares"], 1)
1687 self.failUnlessEqual(sr1["actual-shares"], 0)
1688 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1689 self.failUnlessEqual(sr1["original-sharebytes"], 0)
1690 left = initial_state["estimated-remaining-cycle"]
1691 sr2 = left["space-recovered"]
1692 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1693 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1694 self.failIfEqual(sr2["actual-shares"], None)
1695 self.failIfEqual(sr2["configured-diskbytes"], None)
1696 self.failIfEqual(sr2["original-sharebytes"], None)
1697 d.addCallback(_after_first_bucket)
1698 d.addCallback(lambda ign: self.render1(webstatus))
1699 def _check_html_in_cycle(html):
1700 s = remove_tags(html)
1701 self.failUnlessIn("So far, this cycle has examined "
1702 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1703 self.failUnlessIn("and has recovered: "
1704 "0 shares, 0 buckets (0 mutable / 0 immutable), "
1705 "0 B (0 B / 0 B)", s)
1706 self.failUnlessIn("If expiration were enabled, "
1707 "we would have recovered: "
1708 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1709 " 0 B (0 B / 0 B) by now", s)
1710 self.failUnlessIn("and the remainder of this cycle "
1711 "would probably recover: "
1712 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1713 " 0 B (0 B / 0 B)", s)
1714 self.failUnlessIn("and the whole cycle would probably recover: "
1715 "0 shares, 0 buckets (0 mutable / 0 immutable),"
1716 " 0 B (0 B / 0 B)", s)
1717 self.failUnlessIn("if we were strictly using each lease's default "
1718 "31-day lease lifetime", s)
1719 self.failUnlessIn("this cycle would be expected to recover: ", s)
1720 d.addCallback(_check_html_in_cycle)
1722 # wait for the crawler to finish the first cycle. Nothing should have
1725 return bool(lc.get_state()["last-cycle-finished"] is not None)
1726 d.addCallback(lambda ign: self.poll(_wait))
1728 def _after_first_cycle(ignored):
1730 self.failIf("cycle-to-date" in s)
1731 self.failIf("estimated-remaining-cycle" in s)
1732 self.failIf("estimated-current-cycle" in s)
1733 last = s["history"][0]
1734 self.failUnlessIn("cycle-start-finish-times", last)
1735 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1736 self.failUnlessEqual(last["expiration-enabled"], False)
1737 self.failUnlessIn("configured-expiration-mode", last)
1739 self.failUnlessIn("lease-age-histogram", last)
1740 lah = last["lease-age-histogram"]
1741 self.failUnlessEqual(type(lah), list)
1742 self.failUnlessEqual(len(lah), 1)
1743 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1745 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1746 self.failUnlessEqual(last["corrupt-shares"], [])
1748 rec = last["space-recovered"]
1749 self.failUnlessEqual(rec["examined-buckets"], 4)
1750 self.failUnlessEqual(rec["examined-shares"], 4)
1751 self.failUnlessEqual(rec["actual-buckets"], 0)
1752 self.failUnlessEqual(rec["original-buckets"], 0)
1753 self.failUnlessEqual(rec["configured-buckets"], 0)
1754 self.failUnlessEqual(rec["actual-shares"], 0)
1755 self.failUnlessEqual(rec["original-shares"], 0)
1756 self.failUnlessEqual(rec["configured-shares"], 0)
1757 self.failUnlessEqual(rec["actual-diskbytes"], 0)
1758 self.failUnlessEqual(rec["original-diskbytes"], 0)
1759 self.failUnlessEqual(rec["configured-diskbytes"], 0)
1760 self.failUnlessEqual(rec["actual-sharebytes"], 0)
1761 self.failUnlessEqual(rec["original-sharebytes"], 0)
1762 self.failUnlessEqual(rec["configured-sharebytes"], 0)
1764 def _get_sharefile(si):
1765 return list(ss._iter_share_files(si))[0]
1766 def count_leases(si):
1767 return len(list(_get_sharefile(si).get_leases()))
1768 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1769 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1770 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1771 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1772 d.addCallback(_after_first_cycle)
1773 d.addCallback(lambda ign: self.render1(webstatus))
1774 def _check_html(html):
1775 s = remove_tags(html)
1776 self.failUnlessIn("recovered: 0 shares, 0 buckets "
1777 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1778 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1779 "(2 mutable / 2 immutable),", s)
1780 self.failUnlessIn("but expiration was not enabled", s)
1781 d.addCallback(_check_html)
1782 d.addCallback(lambda ign: self.render_json(webstatus))
1783 def _check_json(json):
1784 data = simplejson.loads(json)
1785 self.failUnlessIn("lease-checker", data)
1786 self.failUnlessIn("lease-checker-progress", data)
1787 d.addCallback(_check_json)
1790 def backdate_lease(self, sf, renew_secret, new_expire_time):
1791 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1792 # "renew" a lease with a new_expire_time that is older than what the
1793 # current lease has), so we have to reach inside it.
1794 for i,lease in enumerate(sf.get_leases()):
1795 if lease.renew_secret == renew_secret:
1796 lease.expiration_time = new_expire_time
1797 f = open(sf.home, 'rb+')
1798 sf._write_lease_record(f, i, lease)
1801 raise IndexError("unable to renew non-existent lease")
1803 def test_expire_age(self):
1804 basedir = "storage/LeaseCrawler/expire_age"
1805 fileutil.make_dirs(basedir)
1806 # setting expiration_time to 2000 means that any lease which is more
1807 # than 2000s old will be expired.
1808 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1809 expiration_enabled=True,
1810 expiration_mode="age",
1811 expiration_override_lease_duration=2000)
1812 # make it start sooner than usual.
1813 lc = ss.lease_checker
1815 lc.stop_after_first_bucket = True
1816 webstatus = StorageStatus(ss)
1818 # create a few shares, with some leases on them
1819 self.make_shares(ss)
1820 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1822 def count_shares(si):
1823 return len(list(ss._iter_share_files(si)))
1824 def _get_sharefile(si):
1825 return list(ss._iter_share_files(si))[0]
1826 def count_leases(si):
1827 return len(list(_get_sharefile(si).get_leases()))
1829 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1830 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1831 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1832 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1833 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1834 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1835 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1836 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1838 # artificially crank back the expiration time on the first lease of
1839 # each share, to make it look like it expired already (age=1000s).
1840 # Some shares have an extra lease which is set to expire at the
1841 # default time in 31 days from now (age=31days). We then run the
1842 # crawler, which will expire the first lease, making some shares get
1843 # deleted and others stay alive (with one remaining lease)
1846 sf0 = _get_sharefile(immutable_si_0)
1847 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1848 sf0_size = os.stat(sf0.home).st_size
1850 # immutable_si_1 gets an extra lease
1851 sf1 = _get_sharefile(immutable_si_1)
1852 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1854 sf2 = _get_sharefile(mutable_si_2)
1855 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1856 sf2_size = os.stat(sf2.home).st_size
1858 # mutable_si_3 gets an extra lease
1859 sf3 = _get_sharefile(mutable_si_3)
1860 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1862 ss.setServiceParent(self.s)
1864 d = fireEventually()
1865 # examine the state right after the first bucket has been processed
1866 def _after_first_bucket(ignored):
1867 p = lc.get_progress()
1868 if not p["cycle-in-progress"]:
1869 d2 = fireEventually()
1870 d2.addCallback(_after_first_bucket)
1872 d.addCallback(_after_first_bucket)
1873 d.addCallback(lambda ign: self.render1(webstatus))
1874 def _check_html_in_cycle(html):
1875 s = remove_tags(html)
1876 # the first bucket encountered gets deleted, and its prefix
1877 # happens to be about 1/5th of the way through the ring, so the
1878 # predictor thinks we'll have 5 shares and that we'll delete them
1879 # all. This part of the test depends upon the SIs landing right
1880 # where they do now.
1881 self.failUnlessIn("The remainder of this cycle is expected to "
1882 "recover: 4 shares, 4 buckets", s)
1883 self.failUnlessIn("The whole cycle is expected to examine "
1884 "5 shares in 5 buckets and to recover: "
1885 "5 shares, 5 buckets", s)
1886 d.addCallback(_check_html_in_cycle)
1888 # wait for the crawler to finish the first cycle. Two shares should
1891 return bool(lc.get_state()["last-cycle-finished"] is not None)
1892 d.addCallback(lambda ign: self.poll(_wait))
1894 def _after_first_cycle(ignored):
1895 self.failUnlessEqual(count_shares(immutable_si_0), 0)
1896 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1897 self.failUnlessEqual(count_leases(immutable_si_1), 1)
1898 self.failUnlessEqual(count_shares(mutable_si_2), 0)
1899 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1900 self.failUnlessEqual(count_leases(mutable_si_3), 1)
1903 last = s["history"][0]
1905 self.failUnlessEqual(last["expiration-enabled"], True)
1906 self.failUnlessEqual(last["configured-expiration-mode"],
1907 ("age", 2000, None, ("mutable", "immutable")))
1908 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1910 rec = last["space-recovered"]
1911 self.failUnlessEqual(rec["examined-buckets"], 4)
1912 self.failUnlessEqual(rec["examined-shares"], 4)
1913 self.failUnlessEqual(rec["actual-buckets"], 2)
1914 self.failUnlessEqual(rec["original-buckets"], 2)
1915 self.failUnlessEqual(rec["configured-buckets"], 2)
1916 self.failUnlessEqual(rec["actual-shares"], 2)
1917 self.failUnlessEqual(rec["original-shares"], 2)
1918 self.failUnlessEqual(rec["configured-shares"], 2)
1919 size = sf0_size + sf2_size
1920 self.failUnlessEqual(rec["actual-sharebytes"], size)
1921 self.failUnlessEqual(rec["original-sharebytes"], size)
1922 self.failUnlessEqual(rec["configured-sharebytes"], size)
1923 # different platforms have different notions of "blocks used by
1924 # this file", so merely assert that it's a number
1925 self.failUnless(rec["actual-diskbytes"] >= 0,
1926 rec["actual-diskbytes"])
1927 self.failUnless(rec["original-diskbytes"] >= 0,
1928 rec["original-diskbytes"])
1929 self.failUnless(rec["configured-diskbytes"] >= 0,
1930 rec["configured-diskbytes"])
1931 d.addCallback(_after_first_cycle)
1932 d.addCallback(lambda ign: self.render1(webstatus))
1933 def _check_html(html):
1934 s = remove_tags(html)
1935 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1936 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1937 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1938 d.addCallback(_check_html)
1941 def test_expire_cutoff_date(self):
1942 basedir = "storage/LeaseCrawler/expire_cutoff_date"
1943 fileutil.make_dirs(basedir)
1944 # setting cutoff-date to 2000 seconds ago means that any lease which
1945 # is more than 2000s old will be expired.
1947 then = int(now - 2000)
1948 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1949 expiration_enabled=True,
1950 expiration_mode="cutoff-date",
1951 expiration_cutoff_date=then)
1952 # make it start sooner than usual.
1953 lc = ss.lease_checker
1955 lc.stop_after_first_bucket = True
1956 webstatus = StorageStatus(ss)
1958 # create a few shares, with some leases on them
1959 self.make_shares(ss)
1960 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1962 def count_shares(si):
1963 return len(list(ss._iter_share_files(si)))
1964 def _get_sharefile(si):
1965 return list(ss._iter_share_files(si))[0]
1966 def count_leases(si):
1967 return len(list(_get_sharefile(si).get_leases()))
1969 self.failUnlessEqual(count_shares(immutable_si_0), 1)
1970 self.failUnlessEqual(count_leases(immutable_si_0), 1)
1971 self.failUnlessEqual(count_shares(immutable_si_1), 1)
1972 self.failUnlessEqual(count_leases(immutable_si_1), 2)
1973 self.failUnlessEqual(count_shares(mutable_si_2), 1)
1974 self.failUnlessEqual(count_leases(mutable_si_2), 1)
1975 self.failUnlessEqual(count_shares(mutable_si_3), 1)
1976 self.failUnlessEqual(count_leases(mutable_si_3), 2)
1978 # artificially crank back the expiration time on the first lease of
1979 # each share, to make it look like was renewed 3000s ago. To achieve
1980 # this, we need to set the expiration time to now-3000+31days. This
1981 # will change when the lease format is improved to contain both
1982 # create/renew time and duration.
1983 new_expiration_time = now - 3000 + 31*24*60*60
1985 # Some shares have an extra lease which is set to expire at the
1986 # default time in 31 days from now (age=31days). We then run the
1987 # crawler, which will expire the first lease, making some shares get
1988 # deleted and others stay alive (with one remaining lease)
1990 sf0 = _get_sharefile(immutable_si_0)
1991 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1992 sf0_size = os.stat(sf0.home).st_size
1994 # immutable_si_1 gets an extra lease
1995 sf1 = _get_sharefile(immutable_si_1)
1996 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1998 sf2 = _get_sharefile(mutable_si_2)
1999 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2000 sf2_size = os.stat(sf2.home).st_size
2002 # mutable_si_3 gets an extra lease
2003 sf3 = _get_sharefile(mutable_si_3)
2004 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2006 ss.setServiceParent(self.s)
2008 d = fireEventually()
2009 # examine the state right after the first bucket has been processed
2010 def _after_first_bucket(ignored):
2011 p = lc.get_progress()
2012 if not p["cycle-in-progress"]:
2013 d2 = fireEventually()
2014 d2.addCallback(_after_first_bucket)
2016 d.addCallback(_after_first_bucket)
2017 d.addCallback(lambda ign: self.render1(webstatus))
2018 def _check_html_in_cycle(html):
2019 s = remove_tags(html)
2020 # the first bucket encountered gets deleted, and its prefix
2021 # happens to be about 1/5th of the way through the ring, so the
2022 # predictor thinks we'll have 5 shares and that we'll delete them
2023 # all. This part of the test depends upon the SIs landing right
2024 # where they do now.
2025 self.failUnlessIn("The remainder of this cycle is expected to "
2026 "recover: 4 shares, 4 buckets", s)
2027 self.failUnlessIn("The whole cycle is expected to examine "
2028 "5 shares in 5 buckets and to recover: "
2029 "5 shares, 5 buckets", s)
2030 d.addCallback(_check_html_in_cycle)
2032 # wait for the crawler to finish the first cycle. Two shares should
2035 return bool(lc.get_state()["last-cycle-finished"] is not None)
2036 d.addCallback(lambda ign: self.poll(_wait))
2038 def _after_first_cycle(ignored):
2039 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2040 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2041 self.failUnlessEqual(count_leases(immutable_si_1), 1)
2042 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2043 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2044 self.failUnlessEqual(count_leases(mutable_si_3), 1)
2047 last = s["history"][0]
2049 self.failUnlessEqual(last["expiration-enabled"], True)
2050 self.failUnlessEqual(last["configured-expiration-mode"],
2051 ("cutoff-date", None, then,
2052 ("mutable", "immutable")))
2053 self.failUnlessEqual(last["leases-per-share-histogram"],
2056 rec = last["space-recovered"]
2057 self.failUnlessEqual(rec["examined-buckets"], 4)
2058 self.failUnlessEqual(rec["examined-shares"], 4)
2059 self.failUnlessEqual(rec["actual-buckets"], 2)
2060 self.failUnlessEqual(rec["original-buckets"], 0)
2061 self.failUnlessEqual(rec["configured-buckets"], 2)
2062 self.failUnlessEqual(rec["actual-shares"], 2)
2063 self.failUnlessEqual(rec["original-shares"], 0)
2064 self.failUnlessEqual(rec["configured-shares"], 2)
2065 size = sf0_size + sf2_size
2066 self.failUnlessEqual(rec["actual-sharebytes"], size)
2067 self.failUnlessEqual(rec["original-sharebytes"], 0)
2068 self.failUnlessEqual(rec["configured-sharebytes"], size)
2069 # different platforms have different notions of "blocks used by
2070 # this file", so merely assert that it's a number
2071 self.failUnless(rec["actual-diskbytes"] >= 0,
2072 rec["actual-diskbytes"])
2073 self.failUnless(rec["original-diskbytes"] >= 0,
2074 rec["original-diskbytes"])
2075 self.failUnless(rec["configured-diskbytes"] >= 0,
2076 rec["configured-diskbytes"])
2077 d.addCallback(_after_first_cycle)
2078 d.addCallback(lambda ign: self.render1(webstatus))
2079 def _check_html(html):
2080 s = remove_tags(html)
2081 self.failUnlessIn("Expiration Enabled:"
2082 " expired leases will be removed", s)
2083 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2084 substr = "Leases created or last renewed before %s will be considered expired." % date
2085 self.failUnlessIn(substr, s)
2086 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2087 d.addCallback(_check_html)
2090 def test_only_immutable(self):
2091 basedir = "storage/LeaseCrawler/only_immutable"
2092 fileutil.make_dirs(basedir)
2094 then = int(now - 2000)
2095 ss = StorageServer(basedir, "\x00" * 20,
2096 expiration_enabled=True,
2097 expiration_mode="cutoff-date",
2098 expiration_cutoff_date=then,
2099 expiration_sharetypes=("immutable",))
2100 lc = ss.lease_checker
2102 webstatus = StorageStatus(ss)
2104 self.make_shares(ss)
2105 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2106 # set all leases to be expirable
2107 new_expiration_time = now - 3000 + 31*24*60*60
2109 def count_shares(si):
2110 return len(list(ss._iter_share_files(si)))
2111 def _get_sharefile(si):
2112 return list(ss._iter_share_files(si))[0]
2113 def count_leases(si):
2114 return len(list(_get_sharefile(si).get_leases()))
2116 sf0 = _get_sharefile(immutable_si_0)
2117 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2118 sf1 = _get_sharefile(immutable_si_1)
2119 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2120 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2121 sf2 = _get_sharefile(mutable_si_2)
2122 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2123 sf3 = _get_sharefile(mutable_si_3)
2124 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2125 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2127 ss.setServiceParent(self.s)
2129 return bool(lc.get_state()["last-cycle-finished"] is not None)
2130 d = self.poll(_wait)
2132 def _after_first_cycle(ignored):
2133 self.failUnlessEqual(count_shares(immutable_si_0), 0)
2134 self.failUnlessEqual(count_shares(immutable_si_1), 0)
2135 self.failUnlessEqual(count_shares(mutable_si_2), 1)
2136 self.failUnlessEqual(count_leases(mutable_si_2), 1)
2137 self.failUnlessEqual(count_shares(mutable_si_3), 1)
2138 self.failUnlessEqual(count_leases(mutable_si_3), 2)
2139 d.addCallback(_after_first_cycle)
2140 d.addCallback(lambda ign: self.render1(webstatus))
2141 def _check_html(html):
2142 s = remove_tags(html)
2143 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2144 d.addCallback(_check_html)
2147 def test_only_mutable(self):
2148 basedir = "storage/LeaseCrawler/only_mutable"
2149 fileutil.make_dirs(basedir)
2151 then = int(now - 2000)
2152 ss = StorageServer(basedir, "\x00" * 20,
2153 expiration_enabled=True,
2154 expiration_mode="cutoff-date",
2155 expiration_cutoff_date=then,
2156 expiration_sharetypes=("mutable",))
2157 lc = ss.lease_checker
2159 webstatus = StorageStatus(ss)
2161 self.make_shares(ss)
2162 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2163 # set all leases to be expirable
2164 new_expiration_time = now - 3000 + 31*24*60*60
2166 def count_shares(si):
2167 return len(list(ss._iter_share_files(si)))
2168 def _get_sharefile(si):
2169 return list(ss._iter_share_files(si))[0]
2170 def count_leases(si):
2171 return len(list(_get_sharefile(si).get_leases()))
2173 sf0 = _get_sharefile(immutable_si_0)
2174 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2175 sf1 = _get_sharefile(immutable_si_1)
2176 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2177 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2178 sf2 = _get_sharefile(mutable_si_2)
2179 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2180 sf3 = _get_sharefile(mutable_si_3)
2181 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2182 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2184 ss.setServiceParent(self.s)
2186 return bool(lc.get_state()["last-cycle-finished"] is not None)
2187 d = self.poll(_wait)
2189 def _after_first_cycle(ignored):
2190 self.failUnlessEqual(count_shares(immutable_si_0), 1)
2191 self.failUnlessEqual(count_leases(immutable_si_0), 1)
2192 self.failUnlessEqual(count_shares(immutable_si_1), 1)
2193 self.failUnlessEqual(count_leases(immutable_si_1), 2)
2194 self.failUnlessEqual(count_shares(mutable_si_2), 0)
2195 self.failUnlessEqual(count_shares(mutable_si_3), 0)
2196 d.addCallback(_after_first_cycle)
2197 d.addCallback(lambda ign: self.render1(webstatus))
2198 def _check_html(html):
2199 s = remove_tags(html)
2200 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2201 d.addCallback(_check_html)
2204 def test_bad_mode(self):
2205 basedir = "storage/LeaseCrawler/bad_mode"
2206 fileutil.make_dirs(basedir)
2207 e = self.failUnlessRaises(ValueError,
2208 StorageServer, basedir, "\x00" * 20,
2209 expiration_mode="bogus")
2210 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2212 def test_parse_duration(self):
2216 p = time_format.parse_duration
2217 self.failUnlessEqual(p("7days"), 7*DAY)
2218 self.failUnlessEqual(p("31day"), 31*DAY)
2219 self.failUnlessEqual(p("60 days"), 60*DAY)
2220 self.failUnlessEqual(p("2mo"), 2*MONTH)
2221 self.failUnlessEqual(p("3 month"), 3*MONTH)
2222 self.failUnlessEqual(p("2years"), 2*YEAR)
2223 e = self.failUnlessRaises(ValueError, p, "2kumquats")
2224 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2226 def test_parse_date(self):
2227 p = time_format.parse_date
2228 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2229 self.failUnlessEqual(p("2009-03-18"), 1237334400)
2231 def test_limited_history(self):
2232 basedir = "storage/LeaseCrawler/limited_history"
2233 fileutil.make_dirs(basedir)
2234 ss = StorageServer(basedir, "\x00" * 20)
2235 # make it start sooner than usual.
2236 lc = ss.lease_checker
2240 # create a few shares, with some leases on them
2241 self.make_shares(ss)
2243 ss.setServiceParent(self.s)
2245 def _wait_until_15_cycles_done():
2246 last = lc.state["last-cycle-finished"]
2247 if last is not None and last >= 15:
2252 d = self.poll(_wait_until_15_cycles_done)
2254 def _check(ignored):
2257 self.failUnlessEqual(len(h), 10)
2258 self.failUnlessEqual(max(h.keys()), 15)
2259 self.failUnlessEqual(min(h.keys()), 6)
2260 d.addCallback(_check)
2263 def test_unpredictable_future(self):
2264 basedir = "storage/LeaseCrawler/unpredictable_future"
2265 fileutil.make_dirs(basedir)
2266 ss = StorageServer(basedir, "\x00" * 20)
2267 # make it start sooner than usual.
2268 lc = ss.lease_checker
2270 lc.cpu_slice = -1.0 # stop quickly
2272 self.make_shares(ss)
2274 ss.setServiceParent(self.s)
2276 d = fireEventually()
2277 def _check(ignored):
2278 # this should fire after the first bucket is complete, but before
2279 # the first prefix is complete, so the progress-measurer won't
2280 # think we've gotten far enough to raise our percent-complete
2281 # above 0%, triggering the cannot-predict-the-future code in
2282 # expirer.py . This will have to change if/when the
2283 # progress-measurer gets smart enough to count buckets (we'll
2284 # have to interrupt it even earlier, before it's finished the
2287 if "cycle-to-date" not in s:
2288 d2 = fireEventually()
2289 d2.addCallback(_check)
2291 self.failUnlessIn("cycle-to-date", s)
2292 self.failUnlessIn("estimated-remaining-cycle", s)
2293 self.failUnlessIn("estimated-current-cycle", s)
2295 left = s["estimated-remaining-cycle"]["space-recovered"]
2296 self.failUnlessEqual(left["actual-buckets"], None)
2297 self.failUnlessEqual(left["original-buckets"], None)
2298 self.failUnlessEqual(left["configured-buckets"], None)
2299 self.failUnlessEqual(left["actual-shares"], None)
2300 self.failUnlessEqual(left["original-shares"], None)
2301 self.failUnlessEqual(left["configured-shares"], None)
2302 self.failUnlessEqual(left["actual-diskbytes"], None)
2303 self.failUnlessEqual(left["original-diskbytes"], None)
2304 self.failUnlessEqual(left["configured-diskbytes"], None)
2305 self.failUnlessEqual(left["actual-sharebytes"], None)
2306 self.failUnlessEqual(left["original-sharebytes"], None)
2307 self.failUnlessEqual(left["configured-sharebytes"], None)
2309 full = s["estimated-remaining-cycle"]["space-recovered"]
2310 self.failUnlessEqual(full["actual-buckets"], None)
2311 self.failUnlessEqual(full["original-buckets"], None)
2312 self.failUnlessEqual(full["configured-buckets"], None)
2313 self.failUnlessEqual(full["actual-shares"], None)
2314 self.failUnlessEqual(full["original-shares"], None)
2315 self.failUnlessEqual(full["configured-shares"], None)
2316 self.failUnlessEqual(full["actual-diskbytes"], None)
2317 self.failUnlessEqual(full["original-diskbytes"], None)
2318 self.failUnlessEqual(full["configured-diskbytes"], None)
2319 self.failUnlessEqual(full["actual-sharebytes"], None)
2320 self.failUnlessEqual(full["original-sharebytes"], None)
2321 self.failUnlessEqual(full["configured-sharebytes"], None)
2323 d.addCallback(_check)
2326 def test_no_st_blocks(self):
2327 basedir = "storage/LeaseCrawler/no_st_blocks"
2328 fileutil.make_dirs(basedir)
2329 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2330 expiration_mode="age",
2331 expiration_override_lease_duration=-1000)
2332 # a negative expiration_time= means the "configured-"
2333 # space-recovered counts will be non-zero, since all shares will have
2336 # make it start sooner than usual.
2337 lc = ss.lease_checker
2340 self.make_shares(ss)
2341 ss.setServiceParent(self.s)
2343 return bool(lc.get_state()["last-cycle-finished"] is not None)
2344 d = self.poll(_wait)
2346 def _check(ignored):
2348 last = s["history"][0]
2349 rec = last["space-recovered"]
2350 self.failUnlessEqual(rec["configured-buckets"], 4)
2351 self.failUnlessEqual(rec["configured-shares"], 4)
2352 self.failUnless(rec["configured-sharebytes"] > 0,
2353 rec["configured-sharebytes"])
2354 # without the .st_blocks field in os.stat() results, we should be
2355 # reporting diskbytes==sharebytes
2356 self.failUnlessEqual(rec["configured-sharebytes"],
2357 rec["configured-diskbytes"])
2358 d.addCallback(_check)
2361 def test_share_corruption(self):
2362 self._poll_should_ignore_these_errors = [
2363 UnknownMutableContainerVersionError,
2364 UnknownImmutableContainerVersionError,
2366 basedir = "storage/LeaseCrawler/share_corruption"
2367 fileutil.make_dirs(basedir)
2368 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2369 w = StorageStatus(ss)
2370 # make it start sooner than usual.
2371 lc = ss.lease_checker
2372 lc.stop_after_first_bucket = True
2376 # create a few shares, with some leases on them
2377 self.make_shares(ss)
2379 # now corrupt one, and make sure the lease-checker keeps going
2380 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2381 first = min(self.sis)
2382 first_b32 = base32.b2a(first)
2383 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2386 f.write("BAD MAGIC")
2388 # if get_share_file() doesn't see the correct mutable magic, it
2389 # assumes the file is an immutable share, and then
2390 # immutable.ShareFile sees a bad version. So regardless of which kind
2391 # of share we corrupted, this will trigger an
2392 # UnknownImmutableContainerVersionError.
2394 # also create an empty bucket
2395 empty_si = base32.b2a("\x04"*16)
2396 empty_bucket_dir = os.path.join(ss.sharedir,
2397 storage_index_to_dir(empty_si))
2398 fileutil.make_dirs(empty_bucket_dir)
2400 ss.setServiceParent(self.s)
2402 d = fireEventually()
2404 # now examine the state right after the first bucket has been
2406 def _after_first_bucket(ignored):
2408 if "cycle-to-date" not in s:
2409 d2 = fireEventually()
2410 d2.addCallback(_after_first_bucket)
2412 so_far = s["cycle-to-date"]
2413 rec = so_far["space-recovered"]
2414 self.failUnlessEqual(rec["examined-buckets"], 1)
2415 self.failUnlessEqual(rec["examined-shares"], 0)
2416 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2417 d.addCallback(_after_first_bucket)
2419 d.addCallback(lambda ign: self.render_json(w))
2420 def _check_json(json):
2421 data = simplejson.loads(json)
2422 # grr. json turns all dict keys into strings.
2423 so_far = data["lease-checker"]["cycle-to-date"]
2424 corrupt_shares = so_far["corrupt-shares"]
2425 # it also turns all tuples into lists
2426 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2427 d.addCallback(_check_json)
2428 d.addCallback(lambda ign: self.render1(w))
2429 def _check_html(html):
2430 s = remove_tags(html)
2431 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2432 d.addCallback(_check_html)
2435 return bool(lc.get_state()["last-cycle-finished"] is not None)
2436 d.addCallback(lambda ign: self.poll(_wait))
2438 def _after_first_cycle(ignored):
2440 last = s["history"][0]
2441 rec = last["space-recovered"]
2442 self.failUnlessEqual(rec["examined-buckets"], 5)
2443 self.failUnlessEqual(rec["examined-shares"], 3)
2444 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2445 d.addCallback(_after_first_cycle)
2446 d.addCallback(lambda ign: self.render_json(w))
2447 def _check_json_history(json):
2448 data = simplejson.loads(json)
2449 last = data["lease-checker"]["history"]["0"]
2450 corrupt_shares = last["corrupt-shares"]
2451 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2452 d.addCallback(_check_json_history)
2453 d.addCallback(lambda ign: self.render1(w))
2454 def _check_html_history(html):
2455 s = remove_tags(html)
2456 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2457 d.addCallback(_check_html_history)
2460 self.flushLoggedErrors(UnknownMutableContainerVersionError,
2461 UnknownImmutableContainerVersionError)
2466 def render_json(self, page):
2467 d = self.render1(page, args={"t": ["json"]})
2470 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2473 self.s = service.MultiService()
2474 self.s.startService()
2476 return self.s.stopService()
2478 def test_no_server(self):
2479 w = StorageStatus(None)
2480 html = w.renderSynchronously()
2481 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2483 def test_status(self):
2484 basedir = "storage/WebStatus/status"
2485 fileutil.make_dirs(basedir)
2486 ss = StorageServer(basedir, "\x00" * 20)
2487 ss.setServiceParent(self.s)
2488 w = StorageStatus(ss)
2490 def _check_html(html):
2491 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2492 s = remove_tags(html)
2493 self.failUnlessIn("Accepting new shares: Yes", s)
2494 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2495 d.addCallback(_check_html)
2496 d.addCallback(lambda ign: self.render_json(w))
2497 def _check_json(json):
2498 data = simplejson.loads(json)
2500 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2501 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2502 self.failUnlessIn("bucket-counter", data)
2503 self.failUnlessIn("lease-checker", data)
2504 d.addCallback(_check_json)
2507 def render_json(self, page):
2508 d = self.render1(page, args={"t": ["json"]})
2511 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2512 def test_status_no_disk_stats(self, mock_get_disk_stats):
2513 mock_get_disk_stats.side_effect = AttributeError()
2515 # Some platforms may have no disk stats API. Make sure the code can handle that
2516 # (test runs on all platforms).
2517 basedir = "storage/WebStatus/status_no_disk_stats"
2518 fileutil.make_dirs(basedir)
2519 ss = StorageServer(basedir, "\x00" * 20)
2520 ss.setServiceParent(self.s)
2521 w = StorageStatus(ss)
2522 html = w.renderSynchronously()
2523 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2524 s = remove_tags(html)
2525 self.failUnlessIn("Accepting new shares: Yes", s)
2526 self.failUnlessIn("Total disk space: ?", s)
2527 self.failUnlessIn("Space Available to Tahoe: ?", s)
2528 self.failUnless(ss.get_available_space() is None)
2530 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2531 def test_status_bad_disk_stats(self, mock_get_disk_stats):
2532 mock_get_disk_stats.side_effect = OSError()
2534 # If the API to get disk stats exists but a call to it fails, then the status should
2535 # show that no shares will be accepted, and get_available_space() should be 0.
2536 basedir = "storage/WebStatus/status_bad_disk_stats"
2537 fileutil.make_dirs(basedir)
2538 ss = StorageServer(basedir, "\x00" * 20)
2539 ss.setServiceParent(self.s)
2540 w = StorageStatus(ss)
2541 html = w.renderSynchronously()
2542 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2543 s = remove_tags(html)
2544 self.failUnlessIn("Accepting new shares: No", s)
2545 self.failUnlessIn("Total disk space: ?", s)
2546 self.failUnlessIn("Space Available to Tahoe: ?", s)
2547 self.failUnlessEqual(ss.get_available_space(), 0)
2549 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2550 def test_status_right_disk_stats(self, mock_get_disk_stats):
2553 free_for_root = 4*GB
2554 free_for_nonroot = 3*GB
2555 reserved_space = 1*GB
2556 used = total - free_for_root
2557 avail = max(free_for_nonroot - reserved_space, 0)
2558 mock_get_disk_stats.return_value = {
2560 'free_for_root': free_for_root,
2561 'free_for_nonroot': free_for_nonroot,
2566 basedir = "storage/WebStatus/status_right_disk_stats"
2567 fileutil.make_dirs(basedir)
2568 ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
2569 expecteddir = ss.sharedir
2570 ss.setServiceParent(self.s)
2571 w = StorageStatus(ss)
2572 html = w.renderSynchronously()
2574 self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
2575 mock_get_disk_stats.call_args_list)
2577 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2578 s = remove_tags(html)
2579 self.failUnlessIn("Total disk space: 5.00 GB", s)
2580 self.failUnlessIn("Disk space used: - 1.00 GB", s)
2581 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
2582 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
2583 self.failUnlessIn("Reserved space: - 1.00 GB", s)
2584 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
2585 self.failUnlessEqual(ss.get_available_space(), 2*GB)
2587 def test_readonly(self):
2588 basedir = "storage/WebStatus/readonly"
2589 fileutil.make_dirs(basedir)
2590 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2591 ss.setServiceParent(self.s)
2592 w = StorageStatus(ss)
2593 html = w.renderSynchronously()
2594 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2595 s = remove_tags(html)
2596 self.failUnlessIn("Accepting new shares: No", s)
2598 def test_reserved(self):
2599 basedir = "storage/WebStatus/reserved"
2600 fileutil.make_dirs(basedir)
2601 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2602 ss.setServiceParent(self.s)
2603 w = StorageStatus(ss)
2604 html = w.renderSynchronously()
2605 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2606 s = remove_tags(html)
2607 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2609 def test_huge_reserved(self):
2610 basedir = "storage/WebStatus/reserved"
2611 fileutil.make_dirs(basedir)
2612 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2613 ss.setServiceParent(self.s)
2614 w = StorageStatus(ss)
2615 html = w.renderSynchronously()
2616 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2617 s = remove_tags(html)
2618 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2620 def test_util(self):
2621 w = StorageStatus(None)
2622 self.failUnlessEqual(w.render_space(None, None), "?")
2623 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2624 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2625 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2626 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2627 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)