1 import time, os.path, platform, stat, re, simplejson, struct, shutil
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24 LayoutInvalid, MDMFSIGNABLEHEADER, \
25 SIGNED_PREFIX, MDMFHEADER, \
26 MDMFOFFSETS, SDMFSlotWriteProxy, \
29 VERIFICATION_KEY_SIZE, \
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
40 def __init__(self, ignore_disconnectors=False):
41 self.ignore = ignore_disconnectors
42 self.disconnectors = {}
43 def notifyOnDisconnect(self, f, *args, **kwargs):
47 self.disconnectors[m] = (f, args, kwargs)
49 def dontNotifyOnDisconnect(self, marker):
52 del self.disconnectors[marker]
54 class FakeStatsProvider:
55 def count(self, name, delta=1):
57 def register_producer(self, producer):
60 class Bucket(unittest.TestCase):
61 def make_workdir(self, name):
62 basedir = os.path.join("storage", "Bucket", name)
63 incoming = os.path.join(basedir, "tmp", "bucket")
64 final = os.path.join(basedir, "bucket")
65 fileutil.make_dirs(basedir)
66 fileutil.make_dirs(os.path.join(basedir, "tmp"))
67 return incoming, final
69 def bucket_writer_closed(self, bw, consumed):
71 def add_latency(self, category, latency):
73 def count(self, name, delta=1):
78 renew_secret = os.urandom(32)
79 cancel_secret = os.urandom(32)
80 expiration_time = time.time() + 5000
81 return LeaseInfo(owner_num, renew_secret, cancel_secret,
82 expiration_time, "\x00" * 20)
84 def test_create(self):
85 incoming, final = self.make_workdir("test_create")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*25)
91 bw.remote_write(75, "d"*7)
94 def test_readwrite(self):
95 incoming, final = self.make_workdir("test_readwrite")
96 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
98 bw.remote_write(0, "a"*25)
99 bw.remote_write(25, "b"*25)
100 bw.remote_write(50, "c"*7) # last block may be short
104 br = BucketReader(self, bw.finalhome)
105 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
115 def callRemote(self, methname, *args, **kwargs):
117 meth = getattr(self.target, "remote_" + methname)
118 return meth(*args, **kwargs)
120 if methname == "slot_readv":
122 if "writev" in methname:
123 self.write_count += 1
125 return defer.maybeDeferred(_call)
128 class BucketProxy(unittest.TestCase):
129 def make_bucket(self, name, size):
130 basedir = os.path.join("storage", "BucketProxy", name)
131 incoming = os.path.join(basedir, "tmp", "bucket")
132 final = os.path.join(basedir, "bucket")
133 fileutil.make_dirs(basedir)
134 fileutil.make_dirs(os.path.join(basedir, "tmp"))
135 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
141 def make_lease(self):
143 renew_secret = os.urandom(32)
144 cancel_secret = os.urandom(32)
145 expiration_time = time.time() + 5000
146 return LeaseInfo(owner_num, renew_secret, cancel_secret,
147 expiration_time, "\x00" * 20)
149 def bucket_writer_closed(self, bw, consumed):
151 def add_latency(self, category, latency):
153 def count(self, name, delta=1):
156 def test_create(self):
157 bw, rb, sharefname = self.make_bucket("test_create", 500)
158 bp = WriteBucketProxy(rb, None,
163 uri_extension_size_max=500)
164 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
166 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
167 # Let's pretend each share has 100 bytes of data, and that there are
168 # 4 segments (25 bytes each), and 8 shares total. So the two
169 # per-segment merkle trees (crypttext_hash_tree,
170 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
171 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
172 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
173 # long. That should make the whole share:
175 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
176 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
178 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
180 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
182 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
184 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
186 uri_extension = "s" + "E"*498 + "e"
188 bw, rb, sharefname = self.make_bucket(name, sharesize)
189 bp = wbp_class(rb, None,
194 uri_extension_size_max=len(uri_extension))
197 d.addCallback(lambda res: bp.put_block(0, "a"*25))
198 d.addCallback(lambda res: bp.put_block(1, "b"*25))
199 d.addCallback(lambda res: bp.put_block(2, "c"*25))
200 d.addCallback(lambda res: bp.put_block(3, "d"*20))
201 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
202 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
203 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
204 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
205 d.addCallback(lambda res: bp.close())
207 # now read everything back
208 def _start_reading(res):
209 br = BucketReader(self, sharefname)
212 server = NoNetworkServer("abc", None)
213 rbp = rbp_class(rb, server, storage_index="")
214 self.failUnlessIn("to peer", repr(rbp))
215 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
217 d1 = rbp.get_block_data(0, 25, 25)
218 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
219 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
220 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
221 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
222 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
223 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
224 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
226 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
227 d1.addCallback(lambda res:
228 self.failUnlessEqual(res, crypttext_hashes))
229 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
230 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
231 d1.addCallback(lambda res: rbp.get_share_hashes())
232 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
233 d1.addCallback(lambda res: rbp.get_uri_extension())
234 d1.addCallback(lambda res:
235 self.failUnlessEqual(res, uri_extension))
239 d.addCallback(_start_reading)
243 def test_readwrite_v1(self):
244 return self._do_test_readwrite("test_readwrite_v1",
245 0x24, WriteBucketProxy, ReadBucketProxy)
247 def test_readwrite_v2(self):
248 return self._do_test_readwrite("test_readwrite_v2",
249 0x44, WriteBucketProxy_v2, ReadBucketProxy)
251 class Server(unittest.TestCase):
254 self.sparent = LoggingServiceParent()
255 self.sparent.startService()
256 self._lease_secret = itertools.count()
258 return self.sparent.stopService()
260 def workdir(self, name):
261 basedir = os.path.join("storage", "Server", name)
264 def create(self, name, reserved_space=0, klass=StorageServer):
265 workdir = self.workdir(name)
266 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
267 stats_provider=FakeStatsProvider())
268 ss.setServiceParent(self.sparent)
271 def test_create(self):
272 self.create("test_create")
274 def allocate(self, ss, storage_index, sharenums, size, canary=None):
275 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
276 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
278 canary = FakeCanary()
279 return ss.remote_allocate_buckets(storage_index,
280 renew_secret, cancel_secret,
281 sharenums, size, canary)
283 def test_large_share(self):
284 syslow = platform.system().lower()
285 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
286 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
288 avail = fileutil.get_available_space('.', 512*2**20)
290 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
292 ss = self.create("test_large_share")
294 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
295 self.failUnlessEqual(already, set())
296 self.failUnlessEqual(set(writers.keys()), set([0]))
298 shnum, bucket = writers.items()[0]
299 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
300 bucket.remote_write(2**32, "ab")
301 bucket.remote_close()
303 readers = ss.remote_get_buckets("allocate")
304 reader = readers[shnum]
305 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
307 def test_dont_overfill_dirs(self):
309 This test asserts that if you add a second share whose storage index
310 share lots of leading bits with an extant share (but isn't the exact
311 same storage index), this won't add an entry to the share directory.
313 ss = self.create("test_dont_overfill_dirs")
314 already, writers = self.allocate(ss, "storageindex", [0], 10)
315 for i, wb in writers.items():
316 wb.remote_write(0, "%10d" % i)
318 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
320 children_of_storedir = set(os.listdir(storedir))
322 # Now store another one under another storageindex that has leading
323 # chars the same as the first storageindex.
324 already, writers = self.allocate(ss, "storageindey", [0], 10)
325 for i, wb in writers.items():
326 wb.remote_write(0, "%10d" % i)
328 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
330 new_children_of_storedir = set(os.listdir(storedir))
331 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
333 def test_remove_incoming(self):
334 ss = self.create("test_remove_incoming")
335 already, writers = self.allocate(ss, "vid", range(3), 10)
336 for i,wb in writers.items():
337 wb.remote_write(0, "%10d" % i)
339 incoming_share_dir = wb.incominghome
340 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
341 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
342 incoming_dir = os.path.dirname(incoming_prefix_dir)
343 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
344 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
345 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
347 def test_abort(self):
348 # remote_abort, when called on a writer, should make sure that
349 # the allocated size of the bucket is not counted by the storage
350 # server when accounting for space.
351 ss = self.create("test_abort")
352 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
353 self.failIfEqual(ss.allocated_size(), 0)
355 # Now abort the writers.
356 for writer in writers.itervalues():
357 writer.remote_abort()
358 self.failUnlessEqual(ss.allocated_size(), 0)
361 def test_allocate(self):
362 ss = self.create("test_allocate")
364 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
366 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
367 self.failUnlessEqual(already, set())
368 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
370 # while the buckets are open, they should not count as readable
371 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
374 for i,wb in writers.items():
375 wb.remote_write(0, "%25d" % i)
377 # aborting a bucket that was already closed is a no-op
380 # now they should be readable
381 b = ss.remote_get_buckets("allocate")
382 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
383 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
385 self.failUnlessIn("BucketReader", b_str)
386 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
388 # now if we ask about writing again, the server should offer those
389 # three buckets as already present. It should offer them even if we
390 # don't ask about those specific ones.
391 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
392 self.failUnlessEqual(already, set([0,1,2]))
393 self.failUnlessEqual(set(writers.keys()), set([3,4]))
395 # while those two buckets are open for writing, the server should
396 # refuse to offer them to uploaders
398 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
399 self.failUnlessEqual(already2, set([0,1,2]))
400 self.failUnlessEqual(set(writers2.keys()), set([5]))
402 # aborting the writes should remove the tempfiles
403 for i,wb in writers2.items():
405 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
406 self.failUnlessEqual(already2, set([0,1,2]))
407 self.failUnlessEqual(set(writers2.keys()), set([5]))
409 for i,wb in writers2.items():
411 for i,wb in writers.items():
414 def test_bad_container_version(self):
415 ss = self.create("test_bad_container_version")
416 a,w = self.allocate(ss, "si1", [0], 10)
417 w[0].remote_write(0, "\xff"*10)
420 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
423 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
426 ss.remote_get_buckets("allocate")
428 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
429 ss.remote_get_buckets, "si1")
430 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
432 def test_disconnect(self):
433 # simulate a disconnection
434 ss = self.create("test_disconnect")
435 canary = FakeCanary()
436 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
437 self.failUnlessEqual(already, set())
438 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
439 for (f,args,kwargs) in canary.disconnectors.values():
444 # that ought to delete the incoming shares
445 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
446 self.failUnlessEqual(already, set())
447 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
449 @mock.patch('allmydata.util.fileutil.get_disk_stats')
450 def test_reserved_space(self, mock_get_disk_stats):
452 mock_get_disk_stats.return_value = {
453 'free_for_nonroot': 15000,
454 'avail': max(15000 - reserved_space, 0),
457 ss = self.create("test_reserved_space", reserved_space=reserved_space)
458 # 15k available, 10k reserved, leaves 5k for shares
460 # a newly created and filled share incurs this much overhead, beyond
461 # the size we request.
463 LEASE_SIZE = 4+32+32+4
464 canary = FakeCanary(True)
465 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
466 self.failUnlessEqual(len(writers), 3)
467 # now the StorageServer should have 3000 bytes provisionally
468 # allocated, allowing only 2000 more to be claimed
469 self.failUnlessEqual(len(ss._active_writers), 3)
471 # allocating 1001-byte shares only leaves room for one
472 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
473 self.failUnlessEqual(len(writers2), 1)
474 self.failUnlessEqual(len(ss._active_writers), 4)
476 # we abandon the first set, so their provisional allocation should be
480 self.failUnlessEqual(len(ss._active_writers), 1)
481 # now we have a provisional allocation of 1001 bytes
483 # and we close the second set, so their provisional allocation should
484 # become real, long-term allocation, and grows to include the
486 for bw in writers2.values():
487 bw.remote_write(0, "a"*25)
492 self.failUnlessEqual(len(ss._active_writers), 0)
494 allocated = 1001 + OVERHEAD + LEASE_SIZE
496 # we have to manually increase available, since we're not doing real
498 mock_get_disk_stats.return_value = {
499 'free_for_nonroot': 15000 - allocated,
500 'avail': max(15000 - allocated - reserved_space, 0),
503 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
504 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
505 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
506 self.failUnlessEqual(len(writers3), 39)
507 self.failUnlessEqual(len(ss._active_writers), 39)
511 self.failUnlessEqual(len(ss._active_writers), 0)
512 ss.disownServiceParent()
516 basedir = self.workdir("test_seek_behavior")
517 fileutil.make_dirs(basedir)
518 filename = os.path.join(basedir, "testfile")
519 f = open(filename, "wb")
522 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
523 # files. mode="a" preserves previous contents but does not allow
524 # seeking-to-create-holes. mode="r+" allows both.
525 f = open(filename, "rb+")
529 filelen = os.stat(filename)[stat.ST_SIZE]
530 self.failUnlessEqual(filelen, 100+3)
531 f2 = open(filename, "rb")
532 self.failUnlessEqual(f2.read(5), "start")
535 def test_leases(self):
536 ss = self.create("test_leases")
537 canary = FakeCanary()
541 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
542 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
543 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
544 sharenums, size, canary)
545 self.failUnlessEqual(len(already), 0)
546 self.failUnlessEqual(len(writers), 5)
547 for wb in writers.values():
550 leases = list(ss.get_leases("si0"))
551 self.failUnlessEqual(len(leases), 1)
552 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
554 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
555 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
556 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
557 sharenums, size, canary)
558 for wb in writers.values():
561 # take out a second lease on si1
562 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
563 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
564 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
565 sharenums, size, canary)
566 self.failUnlessEqual(len(already), 5)
567 self.failUnlessEqual(len(writers), 0)
569 leases = list(ss.get_leases("si1"))
570 self.failUnlessEqual(len(leases), 2)
571 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
573 # and a third lease, using add-lease
574 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
575 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
576 ss.remote_add_lease("si1", rs2a, cs2a)
577 leases = list(ss.get_leases("si1"))
578 self.failUnlessEqual(len(leases), 3)
579 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
581 # add-lease on a missing storage index is silently ignored
582 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
584 # check that si0 is readable
585 readers = ss.remote_get_buckets("si0")
586 self.failUnlessEqual(len(readers), 5)
588 # renew the first lease. Only the proper renew_secret should work
589 ss.remote_renew_lease("si0", rs0)
590 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
591 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
593 # check that si0 is still readable
594 readers = ss.remote_get_buckets("si0")
595 self.failUnlessEqual(len(readers), 5)
598 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
599 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
600 ss.remote_cancel_lease("si0", cs0)
602 # si0 should now be gone
603 readers = ss.remote_get_buckets("si0")
604 self.failUnlessEqual(len(readers), 0)
605 # and the renew should no longer work
606 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
609 # cancel the first lease on si1, leaving the second and third in place
610 ss.remote_cancel_lease("si1", cs1)
611 readers = ss.remote_get_buckets("si1")
612 self.failUnlessEqual(len(readers), 5)
613 # the corresponding renew should no longer work
614 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
616 leases = list(ss.get_leases("si1"))
617 self.failUnlessEqual(len(leases), 2)
618 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
620 ss.remote_renew_lease("si1", rs2)
621 # cancelling the second and third should make it go away
622 ss.remote_cancel_lease("si1", cs2)
623 ss.remote_cancel_lease("si1", cs2a)
624 readers = ss.remote_get_buckets("si1")
625 self.failUnlessEqual(len(readers), 0)
626 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
627 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
628 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
630 leases = list(ss.get_leases("si1"))
631 self.failUnlessEqual(len(leases), 0)
634 # test overlapping uploads
635 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
636 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
637 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
638 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
639 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
640 sharenums, size, canary)
641 self.failUnlessEqual(len(already), 0)
642 self.failUnlessEqual(len(writers), 5)
643 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
644 sharenums, size, canary)
645 self.failUnlessEqual(len(already2), 0)
646 self.failUnlessEqual(len(writers2), 0)
647 for wb in writers.values():
650 leases = list(ss.get_leases("si3"))
651 self.failUnlessEqual(len(leases), 1)
653 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
654 sharenums, size, canary)
655 self.failUnlessEqual(len(already3), 5)
656 self.failUnlessEqual(len(writers3), 0)
658 leases = list(ss.get_leases("si3"))
659 self.failUnlessEqual(len(leases), 2)
661 def test_readonly(self):
662 workdir = self.workdir("test_readonly")
663 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
664 ss.setServiceParent(self.sparent)
666 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
667 self.failUnlessEqual(already, set())
668 self.failUnlessEqual(writers, {})
670 stats = ss.get_stats()
671 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
672 if "storage_server.disk_avail" in stats:
673 # Some platforms may not have an API to get disk stats.
674 # But if there are stats, readonly_storage means disk_avail=0
675 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
677 def test_discard(self):
678 # discard is really only used for other tests, but we test it anyways
679 workdir = self.workdir("test_discard")
680 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
681 ss.setServiceParent(self.sparent)
683 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
684 self.failUnlessEqual(already, set())
685 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
686 for i,wb in writers.items():
687 wb.remote_write(0, "%25d" % i)
689 # since we discard the data, the shares should be present but sparse.
690 # Since we write with some seeks, the data we read back will be all
692 b = ss.remote_get_buckets("vid")
693 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
694 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
696 def test_advise_corruption(self):
697 workdir = self.workdir("test_advise_corruption")
698 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
699 ss.setServiceParent(self.sparent)
701 si0_s = base32.b2a("si0")
702 ss.remote_advise_corrupt_share("immutable", "si0", 0,
703 "This share smells funny.\n")
704 reportdir = os.path.join(workdir, "corruption-advisories")
705 reports = os.listdir(reportdir)
706 self.failUnlessEqual(len(reports), 1)
707 report_si0 = reports[0]
708 self.failUnlessIn(si0_s, report_si0)
709 f = open(os.path.join(reportdir, report_si0), "r")
712 self.failUnlessIn("type: immutable", report)
713 self.failUnlessIn("storage_index: %s" % si0_s, report)
714 self.failUnlessIn("share_number: 0", report)
715 self.failUnlessIn("This share smells funny.", report)
717 # test the RIBucketWriter version too
718 si1_s = base32.b2a("si1")
719 already,writers = self.allocate(ss, "si1", [1], 75)
720 self.failUnlessEqual(already, set())
721 self.failUnlessEqual(set(writers.keys()), set([1]))
722 writers[1].remote_write(0, "data")
723 writers[1].remote_close()
725 b = ss.remote_get_buckets("si1")
726 self.failUnlessEqual(set(b.keys()), set([1]))
727 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
729 reports = os.listdir(reportdir)
730 self.failUnlessEqual(len(reports), 2)
731 report_si1 = [r for r in reports if si1_s in r][0]
732 f = open(os.path.join(reportdir, report_si1), "r")
735 self.failUnlessIn("type: immutable", report)
736 self.failUnlessIn("storage_index: %s" % si1_s, report)
737 self.failUnlessIn("share_number: 1", report)
738 self.failUnlessIn("This share tastes like dust.", report)
742 class MutableServer(unittest.TestCase):
745 self.sparent = LoggingServiceParent()
746 self._lease_secret = itertools.count()
748 return self.sparent.stopService()
750 def workdir(self, name):
751 basedir = os.path.join("storage", "MutableServer", name)
754 def create(self, name):
755 workdir = self.workdir(name)
756 ss = StorageServer(workdir, "\x00" * 20)
757 ss.setServiceParent(self.sparent)
760 def test_create(self):
761 self.create("test_create")
763 def write_enabler(self, we_tag):
764 return hashutil.tagged_hash("we_blah", we_tag)
766 def renew_secret(self, tag):
767 return hashutil.tagged_hash("renew_blah", str(tag))
769 def cancel_secret(self, tag):
770 return hashutil.tagged_hash("cancel_blah", str(tag))
772 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
773 write_enabler = self.write_enabler(we_tag)
774 renew_secret = self.renew_secret(lease_tag)
775 cancel_secret = self.cancel_secret(lease_tag)
776 rstaraw = ss.remote_slot_testv_and_readv_and_writev
777 testandwritev = dict( [ (shnum, ([], [], None) )
778 for shnum in sharenums ] )
780 rc = rstaraw(storage_index,
781 (write_enabler, renew_secret, cancel_secret),
784 (did_write, readv_data) = rc
785 self.failUnless(did_write)
786 self.failUnless(isinstance(readv_data, dict))
787 self.failUnlessEqual(len(readv_data), 0)
789 def test_bad_magic(self):
790 ss = self.create("test_bad_magic")
791 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
792 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
797 read = ss.remote_slot_readv
798 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
799 read, "si1", [0], [(0,10)])
800 self.failUnlessIn(" had magic ", str(e))
801 self.failUnlessIn(" but we wanted ", str(e))
803 def test_container_size(self):
804 ss = self.create("test_container_size")
805 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
807 read = ss.remote_slot_readv
808 rstaraw = ss.remote_slot_testv_and_readv_and_writev
809 secrets = ( self.write_enabler("we1"),
810 self.renew_secret("we1"),
811 self.cancel_secret("we1") )
812 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
813 answer = rstaraw("si1", secrets,
814 {0: ([], [(0,data)], len(data)+12)},
816 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
818 # trying to make the container too large will raise an exception
819 TOOBIG = MutableShareFile.MAX_SIZE + 10
820 self.failUnlessRaises(DataTooLargeError,
821 rstaraw, "si1", secrets,
822 {0: ([], [(0,data)], TOOBIG)},
825 # it should be possible to make the container smaller, although at
826 # the moment this doesn't actually affect the share, unless the
827 # container size is dropped to zero, in which case the share is
829 answer = rstaraw("si1", secrets,
830 {0: ([], [(0,data)], len(data)+8)},
832 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
834 answer = rstaraw("si1", secrets,
835 {0: ([], [(0,data)], 0)},
837 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
839 read_answer = read("si1", [0], [(0,10)])
840 self.failUnlessEqual(read_answer, {})
842 def test_allocate(self):
843 ss = self.create("test_allocate")
844 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
847 read = ss.remote_slot_readv
848 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
850 self.failUnlessEqual(read("si1", [], [(0, 10)]),
851 {0: [""], 1: [""], 2: [""]})
852 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
856 secrets = ( self.write_enabler("we1"),
857 self.renew_secret("we1"),
858 self.cancel_secret("we1") )
859 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
860 write = ss.remote_slot_testv_and_readv_and_writev
861 answer = write("si1", secrets,
862 {0: ([], [(0,data)], None)},
864 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
866 self.failUnlessEqual(read("si1", [0], [(0,20)]),
867 {0: ["00000000001111111111"]})
868 self.failUnlessEqual(read("si1", [0], [(95,10)]),
870 #self.failUnlessEqual(s0.remote_get_length(), 100)
872 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
873 f = self.failUnlessRaises(BadWriteEnablerError,
874 write, "si1", bad_secrets,
876 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
878 # this testv should fail
879 answer = write("si1", secrets,
880 {0: ([(0, 12, "eq", "444444444444"),
881 (20, 5, "eq", "22222"),
888 self.failUnlessEqual(answer, (False,
889 {0: ["000000000011", "22222"],
893 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
896 answer = write("si1", secrets,
897 {0: ([(10, 5, "lt", "11111"),
904 self.failUnlessEqual(answer, (False,
909 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
912 def test_operators(self):
913 # test operators, the data we're comparing is '11111' in all cases.
914 # test both fail+pass, reset data after each one.
915 ss = self.create("test_operators")
917 secrets = ( self.write_enabler("we1"),
918 self.renew_secret("we1"),
919 self.cancel_secret("we1") )
920 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
921 write = ss.remote_slot_testv_and_readv_and_writev
922 read = ss.remote_slot_readv
925 write("si1", secrets,
926 {0: ([], [(0,data)], None)},
932 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
937 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
938 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
939 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
942 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
947 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
948 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
951 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
956 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
957 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
961 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
966 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
967 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
970 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
975 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
976 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
979 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
984 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
985 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
989 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
994 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
995 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
998 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1003 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1004 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1008 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1013 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1014 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1017 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1022 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1023 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1027 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1032 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1033 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1036 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1041 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1042 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1045 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1050 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1051 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1055 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1060 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1061 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1064 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1069 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1070 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1073 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1078 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1079 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1082 # finally, test some operators against empty shares
1083 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1088 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1089 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1092 def test_readv(self):
1093 ss = self.create("test_readv")
1094 secrets = ( self.write_enabler("we1"),
1095 self.renew_secret("we1"),
1096 self.cancel_secret("we1") )
1097 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1098 write = ss.remote_slot_testv_and_readv_and_writev
1099 read = ss.remote_slot_readv
1100 data = [("%d" % i) * 100 for i in range(3)]
1101 rc = write("si1", secrets,
1102 {0: ([], [(0,data[0])], None),
1103 1: ([], [(0,data[1])], None),
1104 2: ([], [(0,data[2])], None),
1106 self.failUnlessEqual(rc, (True, {}))
1108 answer = read("si1", [], [(0, 10)])
1109 self.failUnlessEqual(answer, {0: ["0"*10],
1113 def compare_leases_without_timestamps(self, leases_a, leases_b):
1114 self.failUnlessEqual(len(leases_a), len(leases_b))
1115 for i in range(len(leases_a)):
1118 self.failUnlessEqual(a.owner_num, b.owner_num)
1119 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1120 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1121 self.failUnlessEqual(a.nodeid, b.nodeid)
1123 def compare_leases(self, leases_a, leases_b):
1124 self.failUnlessEqual(len(leases_a), len(leases_b))
1125 for i in range(len(leases_a)):
1128 self.failUnlessEqual(a.owner_num, b.owner_num)
1129 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1130 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1131 self.failUnlessEqual(a.nodeid, b.nodeid)
1132 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1134 def test_leases(self):
1135 ss = self.create("test_leases")
1137 return ( self.write_enabler("we1"),
1138 self.renew_secret("we1-%d" % n),
1139 self.cancel_secret("we1-%d" % n) )
1140 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1141 write = ss.remote_slot_testv_and_readv_and_writev
1142 read = ss.remote_slot_readv
1143 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1144 self.failUnlessEqual(rc, (True, {}))
1146 # create a random non-numeric file in the bucket directory, to
1147 # exercise the code that's supposed to ignore those.
1148 bucket_dir = os.path.join(self.workdir("test_leases"),
1149 "shares", storage_index_to_dir("si1"))
1150 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1151 f.write("you ought to be ignoring me\n")
1154 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1155 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1157 # add-lease on a missing storage index is silently ignored
1158 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1160 # re-allocate the slots and use the same secrets, that should update
1162 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1163 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1166 ss.remote_renew_lease("si1", secrets(0)[1])
1167 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1169 # now allocate them with a bunch of different secrets, to trigger the
1170 # extended lease code. Use add_lease for one of them.
1171 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1172 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1173 secrets2 = secrets(2)
1174 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1175 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1176 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1177 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1178 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1180 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1182 # cancel one of them
1183 ss.remote_cancel_lease("si1", secrets(5)[2])
1184 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1186 all_leases = list(s0.get_leases())
1187 # and write enough data to expand the container, forcing the server
1188 # to move the leases
1189 write("si1", secrets(0),
1190 {0: ([], [(0,data)], 200), },
1193 # read back the leases, make sure they're still intact.
1194 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1196 ss.remote_renew_lease("si1", secrets(0)[1])
1197 ss.remote_renew_lease("si1", secrets(1)[1])
1198 ss.remote_renew_lease("si1", secrets(2)[1])
1199 ss.remote_renew_lease("si1", secrets(3)[1])
1200 ss.remote_renew_lease("si1", secrets(4)[1])
1201 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1202 # get a new copy of the leases, with the current timestamps. Reading
1203 # data and failing to renew/cancel leases should leave the timestamps
1205 all_leases = list(s0.get_leases())
1206 # renewing with a bogus token should prompt an error message
1208 # examine the exception thus raised, make sure the old nodeid is
1209 # present, to provide for share migration
1210 e = self.failUnlessRaises(IndexError,
1211 ss.remote_renew_lease, "si1",
1214 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1215 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1216 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1218 # same for cancelling
1219 self.failUnlessRaises(IndexError,
1220 ss.remote_cancel_lease, "si1",
1222 self.compare_leases(all_leases, list(s0.get_leases()))
1224 # reading shares should not modify the timestamp
1225 read("si1", [], [(0,200)])
1226 self.compare_leases(all_leases, list(s0.get_leases()))
1228 write("si1", secrets(0),
1229 {0: ([], [(200, "make me bigger")], None)}, [])
1230 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1232 write("si1", secrets(0),
1233 {0: ([], [(500, "make me really bigger")], None)}, [])
1234 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1236 # now cancel them all
1237 ss.remote_cancel_lease("si1", secrets(0)[2])
1238 ss.remote_cancel_lease("si1", secrets(1)[2])
1239 ss.remote_cancel_lease("si1", secrets(2)[2])
1240 ss.remote_cancel_lease("si1", secrets(3)[2])
1242 # the slot should still be there
1243 remaining_shares = read("si1", [], [(0,10)])
1244 self.failUnlessEqual(len(remaining_shares), 1)
1245 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1247 # cancelling a non-existent lease should raise an IndexError
1248 self.failUnlessRaises(IndexError,
1249 ss.remote_cancel_lease, "si1", "nonsecret")
1251 # and the slot should still be there
1252 remaining_shares = read("si1", [], [(0,10)])
1253 self.failUnlessEqual(len(remaining_shares), 1)
1254 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1256 ss.remote_cancel_lease("si1", secrets(4)[2])
1257 # now the slot should be gone
1258 no_shares = read("si1", [], [(0,10)])
1259 self.failUnlessEqual(no_shares, {})
1261 # cancelling a lease on a non-existent share should raise an IndexError
1262 self.failUnlessRaises(IndexError,
1263 ss.remote_cancel_lease, "si2", "nonsecret")
1265 def test_remove(self):
1266 ss = self.create("test_remove")
1267 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1269 readv = ss.remote_slot_readv
1270 writev = ss.remote_slot_testv_and_readv_and_writev
1271 secrets = ( self.write_enabler("we1"),
1272 self.renew_secret("we1"),
1273 self.cancel_secret("we1") )
1274 # delete sh0 by setting its size to zero
1275 answer = writev("si1", secrets,
1278 # the answer should mention all the shares that existed before the
1280 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1281 # but a new read should show only sh1 and sh2
1282 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1285 # delete sh1 by setting its size to zero
1286 answer = writev("si1", secrets,
1289 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1290 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1293 # delete sh2 by setting its size to zero
1294 answer = writev("si1", secrets,
1297 self.failUnlessEqual(answer, (True, {2:[]}) )
1298 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1300 # and the bucket directory should now be gone
1301 si = base32.b2a("si1")
1302 # note: this is a detail of the storage server implementation, and
1303 # may change in the future
1305 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1306 bucketdir = os.path.join(prefixdir, si)
1307 self.failUnless(os.path.exists(prefixdir), prefixdir)
1308 self.failIf(os.path.exists(bucketdir), bucketdir)
1311 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1313 self.sparent = LoggingServiceParent()
1314 self._lease_secret = itertools.count()
1315 self.ss = self.create("MDMFProxies storage test server")
1316 self.rref = RemoteBucket()
1317 self.rref.target = self.ss
1318 self.secrets = (self.write_enabler("we_secret"),
1319 self.renew_secret("renew_secret"),
1320 self.cancel_secret("cancel_secret"))
1321 self.segment = "aaaaaa"
1323 self.salt = "a" * 16
1324 self.block_hash = "a" * 32
1325 self.block_hash_tree = [self.block_hash for i in xrange(6)]
1326 self.share_hash = self.block_hash
1327 self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1328 self.signature = "foobarbaz"
1329 self.verification_key = "vvvvvv"
1330 self.encprivkey = "private"
1331 self.root_hash = self.block_hash
1332 self.salt_hash = self.root_hash
1333 self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1334 self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1335 self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1336 # blockhashes and salt hashes are serialized in the same way,
1337 # only we lop off the first element and store that in the
1339 self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1343 self.sparent.stopService()
1344 shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1347 def write_enabler(self, we_tag):
1348 return hashutil.tagged_hash("we_blah", we_tag)
1351 def renew_secret(self, tag):
1352 return hashutil.tagged_hash("renew_blah", str(tag))
1355 def cancel_secret(self, tag):
1356 return hashutil.tagged_hash("cancel_blah", str(tag))
1359 def workdir(self, name):
1360 basedir = os.path.join("storage", "MutableServer", name)
1364 def create(self, name):
1365 workdir = self.workdir(name)
1366 ss = StorageServer(workdir, "\x00" * 20)
1367 ss.setServiceParent(self.sparent)
1371 def build_test_mdmf_share(self, tail_segment=False, empty=False):
1372 # Start with the checkstring
1373 data = struct.pack(">BQ32s",
1377 self.checkstring = data
1378 # Next, the encoding parameters
1380 data += struct.pack(">BBQQ",
1386 data += struct.pack(">BBQQ",
1392 data += struct.pack(">BBQQ",
1397 # Now we'll build the offsets.
1399 if not tail_segment and not empty:
1401 sharedata += self.salt + self.block
1404 sharedata += self.salt + self.block
1405 sharedata += self.salt + "a"
1407 # The encrypted private key comes after the shares + salts
1408 offset_size = struct.calcsize(MDMFOFFSETS)
1409 encrypted_private_key_offset = len(data) + offset_size
1410 # The share has chain comes after the private key
1411 sharehashes_offset = encrypted_private_key_offset + \
1412 len(self.encprivkey)
1414 # The signature comes after the share hash chain.
1415 signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1417 verification_key_offset = signature_offset + len(self.signature)
1418 verification_key_end = verification_key_offset + \
1419 len(self.verification_key)
1421 share_data_offset = offset_size
1422 share_data_offset += PRIVATE_KEY_SIZE
1423 share_data_offset += SIGNATURE_SIZE
1424 share_data_offset += VERIFICATION_KEY_SIZE
1425 share_data_offset += SHARE_HASH_CHAIN_SIZE
1427 blockhashes_offset = share_data_offset + len(sharedata)
1428 eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1430 data += struct.pack(MDMFOFFSETS,
1431 encrypted_private_key_offset,
1434 verification_key_offset,
1435 verification_key_end,
1441 self.offsets['enc_privkey'] = encrypted_private_key_offset
1442 self.offsets['block_hash_tree'] = blockhashes_offset
1443 self.offsets['share_hash_chain'] = sharehashes_offset
1444 self.offsets['signature'] = signature_offset
1445 self.offsets['verification_key'] = verification_key_offset
1446 self.offsets['share_data'] = share_data_offset
1447 self.offsets['verification_key_end'] = verification_key_end
1448 self.offsets['EOF'] = eof_offset
1451 data += self.encprivkey
1453 data += self.share_hash_chain_s
1455 data += self.signature
1456 # and the verification key
1457 data += self.verification_key
1458 # Then we'll add in gibberish until we get to the right point.
1459 nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1462 # Then the share data
1465 data += self.block_hash_tree_s
1469 def write_test_share_to_server(self,
1474 I write some data for the read tests to read to self.ss
1476 If tail_segment=True, then I will write a share that has a
1477 smaller tail segment than other segments.
1479 write = self.ss.remote_slot_testv_and_readv_and_writev
1480 data = self.build_test_mdmf_share(tail_segment, empty)
1481 # Finally, we write the whole thing to the storage server in one
1483 testvs = [(0, 1, "eq", "")]
1485 tws[0] = (testvs, [(0, data)], None)
1487 results = write(storage_index, self.secrets, tws, readv)
1488 self.failUnless(results[0])
1491 def build_test_sdmf_share(self, empty=False):
1495 sharedata = self.segment * 6
1496 self.sharedata = sharedata
1497 blocksize = len(sharedata) / 3
1498 block = sharedata[:blocksize]
1499 self.blockdata = block
1500 prefix = struct.pack(">BQ32s16s BBQQ",
1510 post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1511 signature_offset = post_offset + len(self.verification_key)
1512 sharehashes_offset = signature_offset + len(self.signature)
1513 blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1514 sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1515 encprivkey_offset = sharedata_offset + len(block)
1516 eof_offset = encprivkey_offset + len(self.encprivkey)
1517 offsets = struct.pack(">LLLLQQ",
1524 final_share = "".join([prefix,
1526 self.verification_key,
1528 self.share_hash_chain_s,
1529 self.block_hash_tree_s,
1533 self.offsets['signature'] = signature_offset
1534 self.offsets['share_hash_chain'] = sharehashes_offset
1535 self.offsets['block_hash_tree'] = blockhashes_offset
1536 self.offsets['share_data'] = sharedata_offset
1537 self.offsets['enc_privkey'] = encprivkey_offset
1538 self.offsets['EOF'] = eof_offset
1542 def write_sdmf_share_to_server(self,
1545 # Some tests need SDMF shares to verify that we can still
1546 # read them. This method writes one, which resembles but is not
1548 write = self.ss.remote_slot_testv_and_readv_and_writev
1549 share = self.build_test_sdmf_share(empty)
1550 testvs = [(0, 1, "eq", "")]
1552 tws[0] = (testvs, [(0, share)], None)
1554 results = write(storage_index, self.secrets, tws, readv)
1555 self.failUnless(results[0])
1558 def test_read(self):
1559 self.write_test_share_to_server("si1")
1560 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1561 # Check that every method equals what we expect it to.
1562 d = defer.succeed(None)
1563 def _check_block_and_salt((block, salt)):
1564 self.failUnlessEqual(block, self.block)
1565 self.failUnlessEqual(salt, self.salt)
1568 d.addCallback(lambda ignored, i=i:
1569 mr.get_block_and_salt(i))
1570 d.addCallback(_check_block_and_salt)
1572 d.addCallback(lambda ignored:
1573 mr.get_encprivkey())
1574 d.addCallback(lambda encprivkey:
1575 self.failUnlessEqual(self.encprivkey, encprivkey))
1577 d.addCallback(lambda ignored:
1578 mr.get_blockhashes())
1579 d.addCallback(lambda blockhashes:
1580 self.failUnlessEqual(self.block_hash_tree, blockhashes))
1582 d.addCallback(lambda ignored:
1583 mr.get_sharehashes())
1584 d.addCallback(lambda sharehashes:
1585 self.failUnlessEqual(self.share_hash_chain, sharehashes))
1587 d.addCallback(lambda ignored:
1589 d.addCallback(lambda signature:
1590 self.failUnlessEqual(signature, self.signature))
1592 d.addCallback(lambda ignored:
1593 mr.get_verification_key())
1594 d.addCallback(lambda verification_key:
1595 self.failUnlessEqual(verification_key, self.verification_key))
1597 d.addCallback(lambda ignored:
1599 d.addCallback(lambda seqnum:
1600 self.failUnlessEqual(seqnum, 0))
1602 d.addCallback(lambda ignored:
1604 d.addCallback(lambda root_hash:
1605 self.failUnlessEqual(self.root_hash, root_hash))
1607 d.addCallback(lambda ignored:
1609 d.addCallback(lambda seqnum:
1610 self.failUnlessEqual(0, seqnum))
1612 d.addCallback(lambda ignored:
1613 mr.get_encoding_parameters())
1614 def _check_encoding_parameters((k, n, segsize, datalen)):
1615 self.failUnlessEqual(k, 3)
1616 self.failUnlessEqual(n, 10)
1617 self.failUnlessEqual(segsize, 6)
1618 self.failUnlessEqual(datalen, 36)
1619 d.addCallback(_check_encoding_parameters)
1621 d.addCallback(lambda ignored:
1622 mr.get_checkstring())
1623 d.addCallback(lambda checkstring:
1624 self.failUnlessEqual(checkstring, checkstring))
1628 def test_read_with_different_tail_segment_size(self):
1629 self.write_test_share_to_server("si1", tail_segment=True)
1630 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1631 d = mr.get_block_and_salt(5)
1632 def _check_tail_segment(results):
1633 block, salt = results
1634 self.failUnlessEqual(len(block), 1)
1635 self.failUnlessEqual(block, "a")
1636 d.addCallback(_check_tail_segment)
1640 def test_get_block_with_invalid_segnum(self):
1641 self.write_test_share_to_server("si1")
1642 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1643 d = defer.succeed(None)
1644 d.addCallback(lambda ignored:
1645 self.shouldFail(LayoutInvalid, "test invalid segnum",
1647 mr.get_block_and_salt, 7))
1651 def test_get_encoding_parameters_first(self):
1652 self.write_test_share_to_server("si1")
1653 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1654 d = mr.get_encoding_parameters()
1655 def _check_encoding_parameters((k, n, segment_size, datalen)):
1656 self.failUnlessEqual(k, 3)
1657 self.failUnlessEqual(n, 10)
1658 self.failUnlessEqual(segment_size, 6)
1659 self.failUnlessEqual(datalen, 36)
1660 d.addCallback(_check_encoding_parameters)
1664 def test_get_seqnum_first(self):
1665 self.write_test_share_to_server("si1")
1666 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1668 d.addCallback(lambda seqnum:
1669 self.failUnlessEqual(seqnum, 0))
1673 def test_get_root_hash_first(self):
1674 self.write_test_share_to_server("si1")
1675 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1676 d = mr.get_root_hash()
1677 d.addCallback(lambda root_hash:
1678 self.failUnlessEqual(root_hash, self.root_hash))
1682 def test_get_checkstring_first(self):
1683 self.write_test_share_to_server("si1")
1684 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1685 d = mr.get_checkstring()
1686 d.addCallback(lambda checkstring:
1687 self.failUnlessEqual(checkstring, self.checkstring))
1691 def test_write_read_vectors(self):
1692 # When writing for us, the storage server will return to us a
1693 # read vector, along with its result. If a write fails because
1694 # the test vectors failed, this read vector can help us to
1695 # diagnose the problem. This test ensures that the read vector
1696 # is working appropriately.
1697 mw = self._make_new_mw("si1", 0)
1700 mw.put_block(self.block, i, self.salt)
1701 mw.put_encprivkey(self.encprivkey)
1702 mw.put_blockhashes(self.block_hash_tree)
1703 mw.put_sharehashes(self.share_hash_chain)
1704 mw.put_root_hash(self.root_hash)
1705 mw.put_signature(self.signature)
1706 mw.put_verification_key(self.verification_key)
1707 d = mw.finish_publishing()
1709 self.failUnless(len(results), 2)
1710 result, readv = results
1711 self.failUnless(result)
1713 self.old_checkstring = mw.get_checkstring()
1714 mw.set_checkstring("")
1715 d.addCallback(_then)
1716 d.addCallback(lambda ignored:
1717 mw.finish_publishing())
1718 def _then_again(results):
1719 self.failUnlessEqual(len(results), 2)
1720 result, readvs = results
1722 self.failUnlessIn(0, readvs)
1723 readv = readvs[0][0]
1724 self.failUnlessEqual(readv, self.old_checkstring)
1725 d.addCallback(_then_again)
1726 # The checkstring remains the same for the rest of the process.
1730 def test_private_key_after_share_hash_chain(self):
1731 mw = self._make_new_mw("si1", 0)
1732 d = defer.succeed(None)
1734 d.addCallback(lambda ignored, i=i:
1735 mw.put_block(self.block, i, self.salt))
1736 d.addCallback(lambda ignored:
1737 mw.put_encprivkey(self.encprivkey))
1738 d.addCallback(lambda ignored:
1739 mw.put_sharehashes(self.share_hash_chain))
1741 # Now try to put the private key again.
1742 d.addCallback(lambda ignored:
1743 self.shouldFail(LayoutInvalid, "test repeat private key",
1745 mw.put_encprivkey, self.encprivkey))
1749 def test_signature_after_verification_key(self):
1750 mw = self._make_new_mw("si1", 0)
1751 d = defer.succeed(None)
1752 # Put everything up to and including the verification key.
1754 d.addCallback(lambda ignored, i=i:
1755 mw.put_block(self.block, i, self.salt))
1756 d.addCallback(lambda ignored:
1757 mw.put_encprivkey(self.encprivkey))
1758 d.addCallback(lambda ignored:
1759 mw.put_blockhashes(self.block_hash_tree))
1760 d.addCallback(lambda ignored:
1761 mw.put_sharehashes(self.share_hash_chain))
1762 d.addCallback(lambda ignored:
1763 mw.put_root_hash(self.root_hash))
1764 d.addCallback(lambda ignored:
1765 mw.put_signature(self.signature))
1766 d.addCallback(lambda ignored:
1767 mw.put_verification_key(self.verification_key))
1768 # Now try to put the signature again. This should fail
1769 d.addCallback(lambda ignored:
1770 self.shouldFail(LayoutInvalid, "signature after verification",
1772 mw.put_signature, self.signature))
1776 def test_uncoordinated_write(self):
1777 # Make two mutable writers, both pointing to the same storage
1778 # server, both at the same storage index, and try writing to the
1780 mw1 = self._make_new_mw("si1", 0)
1781 mw2 = self._make_new_mw("si1", 0)
1783 def _check_success(results):
1784 result, readvs = results
1785 self.failUnless(result)
1787 def _check_failure(results):
1788 result, readvs = results
1791 def _write_share(mw):
1793 mw.put_block(self.block, i, self.salt)
1794 mw.put_encprivkey(self.encprivkey)
1795 mw.put_blockhashes(self.block_hash_tree)
1796 mw.put_sharehashes(self.share_hash_chain)
1797 mw.put_root_hash(self.root_hash)
1798 mw.put_signature(self.signature)
1799 mw.put_verification_key(self.verification_key)
1800 return mw.finish_publishing()
1801 d = _write_share(mw1)
1802 d.addCallback(_check_success)
1803 d.addCallback(lambda ignored:
1805 d.addCallback(_check_failure)
1809 def test_invalid_salt_size(self):
1810 # Salts need to be 16 bytes in size. Writes that attempt to
1811 # write more or less than this should be rejected.
1812 mw = self._make_new_mw("si1", 0)
1813 invalid_salt = "a" * 17 # 17 bytes
1814 another_invalid_salt = "b" * 15 # 15 bytes
1815 d = defer.succeed(None)
1816 d.addCallback(lambda ignored:
1817 self.shouldFail(LayoutInvalid, "salt too big",
1819 mw.put_block, self.block, 0, invalid_salt))
1820 d.addCallback(lambda ignored:
1821 self.shouldFail(LayoutInvalid, "salt too small",
1823 mw.put_block, self.block, 0,
1824 another_invalid_salt))
1828 def test_write_test_vectors(self):
1829 # If we give the write proxy a bogus test vector at
1830 # any point during the process, it should fail to write when we
1832 def _check_failure(results):
1833 self.failUnlessEqual(len(results), 2)
1837 def _check_success(results):
1838 self.failUnlessEqual(len(results), 2)
1840 self.failUnless(results)
1842 mw = self._make_new_mw("si1", 0)
1843 mw.set_checkstring("this is a lie")
1845 mw.put_block(self.block, i, self.salt)
1846 mw.put_encprivkey(self.encprivkey)
1847 mw.put_blockhashes(self.block_hash_tree)
1848 mw.put_sharehashes(self.share_hash_chain)
1849 mw.put_root_hash(self.root_hash)
1850 mw.put_signature(self.signature)
1851 mw.put_verification_key(self.verification_key)
1852 d = mw.finish_publishing()
1853 d.addCallback(_check_failure)
1854 d.addCallback(lambda ignored:
1855 mw.set_checkstring(""))
1856 d.addCallback(lambda ignored:
1857 mw.finish_publishing())
1858 d.addCallback(_check_success)
1862 def serialize_blockhashes(self, blockhashes):
1863 return "".join(blockhashes)
1866 def serialize_sharehashes(self, sharehashes):
1867 ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1868 for i in sorted(sharehashes.keys())])
1872 def test_write(self):
1873 # This translates to a file with 6 6-byte segments, and with 2-byte
1875 mw = self._make_new_mw("si1", 0)
1876 # Test writing some blocks.
1877 read = self.ss.remote_slot_readv
1878 expected_private_key_offset = struct.calcsize(MDMFHEADER)
1879 expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1880 PRIVATE_KEY_SIZE + \
1882 VERIFICATION_KEY_SIZE + \
1883 SHARE_HASH_CHAIN_SIZE
1884 written_block_size = 2 + len(self.salt)
1885 written_block = self.block + self.salt
1887 mw.put_block(self.block, i, self.salt)
1889 mw.put_encprivkey(self.encprivkey)
1890 mw.put_blockhashes(self.block_hash_tree)
1891 mw.put_sharehashes(self.share_hash_chain)
1892 mw.put_root_hash(self.root_hash)
1893 mw.put_signature(self.signature)
1894 mw.put_verification_key(self.verification_key)
1895 d = mw.finish_publishing()
1896 def _check_publish(results):
1897 self.failUnlessEqual(len(results), 2)
1898 result, ign = results
1899 self.failUnless(result, "publish failed")
1901 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1902 {0: [written_block]})
1904 self.failUnlessEqual(len(self.encprivkey), 7)
1905 self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1906 {0: [self.encprivkey]})
1908 expected_block_hash_offset = expected_sharedata_offset + \
1909 (6 * written_block_size)
1910 self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1911 self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1912 {0: [self.block_hash_tree_s]})
1914 expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1915 self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1916 {0: [self.share_hash_chain_s]})
1918 self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1919 {0: [self.root_hash]})
1920 expected_signature_offset = expected_share_hash_offset + \
1921 len(self.share_hash_chain_s)
1922 self.failUnlessEqual(len(self.signature), 9)
1923 self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1924 {0: [self.signature]})
1926 expected_verification_key_offset = expected_signature_offset + len(self.signature)
1927 self.failUnlessEqual(len(self.verification_key), 6)
1928 self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1929 {0: [self.verification_key]})
1931 signable = mw.get_signable()
1932 verno, seq, roothash, k, n, segsize, datalen = \
1933 struct.unpack(">BQ32sBBQQ",
1935 self.failUnlessEqual(verno, 1)
1936 self.failUnlessEqual(seq, 0)
1937 self.failUnlessEqual(roothash, self.root_hash)
1938 self.failUnlessEqual(k, 3)
1939 self.failUnlessEqual(n, 10)
1940 self.failUnlessEqual(segsize, 6)
1941 self.failUnlessEqual(datalen, 36)
1942 expected_eof_offset = expected_block_hash_offset + \
1943 len(self.block_hash_tree_s)
1945 # Check the version number to make sure that it is correct.
1946 expected_version_number = struct.pack(">B", 1)
1947 self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1948 {0: [expected_version_number]})
1949 # Check the sequence number to make sure that it is correct
1950 expected_sequence_number = struct.pack(">Q", 0)
1951 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
1952 {0: [expected_sequence_number]})
1953 # Check that the encoding parameters (k, N, segement size, data
1954 # length) are what they should be. These are 3, 10, 6, 36
1955 expected_k = struct.pack(">B", 3)
1956 self.failUnlessEqual(read("si1", [0], [(41, 1)]),
1958 expected_n = struct.pack(">B", 10)
1959 self.failUnlessEqual(read("si1", [0], [(42, 1)]),
1961 expected_segment_size = struct.pack(">Q", 6)
1962 self.failUnlessEqual(read("si1", [0], [(43, 8)]),
1963 {0: [expected_segment_size]})
1964 expected_data_length = struct.pack(">Q", 36)
1965 self.failUnlessEqual(read("si1", [0], [(51, 8)]),
1966 {0: [expected_data_length]})
1967 expected_offset = struct.pack(">Q", expected_private_key_offset)
1968 self.failUnlessEqual(read("si1", [0], [(59, 8)]),
1969 {0: [expected_offset]})
1970 expected_offset = struct.pack(">Q", expected_share_hash_offset)
1971 self.failUnlessEqual(read("si1", [0], [(67, 8)]),
1972 {0: [expected_offset]})
1973 expected_offset = struct.pack(">Q", expected_signature_offset)
1974 self.failUnlessEqual(read("si1", [0], [(75, 8)]),
1975 {0: [expected_offset]})
1976 expected_offset = struct.pack(">Q", expected_verification_key_offset)
1977 self.failUnlessEqual(read("si1", [0], [(83, 8)]),
1978 {0: [expected_offset]})
1979 expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
1980 self.failUnlessEqual(read("si1", [0], [(91, 8)]),
1981 {0: [expected_offset]})
1982 expected_offset = struct.pack(">Q", expected_sharedata_offset)
1983 self.failUnlessEqual(read("si1", [0], [(99, 8)]),
1984 {0: [expected_offset]})
1985 expected_offset = struct.pack(">Q", expected_block_hash_offset)
1986 self.failUnlessEqual(read("si1", [0], [(107, 8)]),
1987 {0: [expected_offset]})
1988 expected_offset = struct.pack(">Q", expected_eof_offset)
1989 self.failUnlessEqual(read("si1", [0], [(115, 8)]),
1990 {0: [expected_offset]})
1991 d.addCallback(_check_publish)
1994 def _make_new_mw(self, si, share, datalength=36):
1995 # This is a file of size 36 bytes. Since it has a segment
1996 # size of 6, we know that it has 6 byte segments, which will
1997 # be split into blocks of 2 bytes because our FEC k
1999 mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2004 def test_write_rejected_with_too_many_blocks(self):
2005 mw = self._make_new_mw("si0", 0)
2007 # Try writing too many blocks. We should not be able to write
2009 # blocks into each share.
2010 d = defer.succeed(None)
2012 d.addCallback(lambda ignored, i=i:
2013 mw.put_block(self.block, i, self.salt))
2014 d.addCallback(lambda ignored:
2015 self.shouldFail(LayoutInvalid, "too many blocks",
2017 mw.put_block, self.block, 7, self.salt))
2021 def test_write_rejected_with_invalid_salt(self):
2022 # Try writing an invalid salt. Salts are 16 bytes -- any more or
2023 # less should cause an error.
2024 mw = self._make_new_mw("si1", 0)
2025 bad_salt = "a" * 17 # 17 bytes
2026 d = defer.succeed(None)
2027 d.addCallback(lambda ignored:
2028 self.shouldFail(LayoutInvalid, "test_invalid_salt",
2029 None, mw.put_block, self.block, 7, bad_salt))
2033 def test_write_rejected_with_invalid_root_hash(self):
2034 # Try writing an invalid root hash. This should be SHA256d, and
2035 # 32 bytes long as a result.
2036 mw = self._make_new_mw("si2", 0)
2037 # 17 bytes != 32 bytes
2038 invalid_root_hash = "a" * 17
2039 d = defer.succeed(None)
2040 # Before this test can work, we need to put some blocks + salts,
2041 # a block hash tree, and a share hash tree. Otherwise, we'll see
2042 # failures that match what we are looking for, but are caused by
2043 # the constraints imposed on operation ordering.
2045 d.addCallback(lambda ignored, i=i:
2046 mw.put_block(self.block, i, self.salt))
2047 d.addCallback(lambda ignored:
2048 mw.put_encprivkey(self.encprivkey))
2049 d.addCallback(lambda ignored:
2050 mw.put_blockhashes(self.block_hash_tree))
2051 d.addCallback(lambda ignored:
2052 mw.put_sharehashes(self.share_hash_chain))
2053 d.addCallback(lambda ignored:
2054 self.shouldFail(LayoutInvalid, "invalid root hash",
2055 None, mw.put_root_hash, invalid_root_hash))
2059 def test_write_rejected_with_invalid_blocksize(self):
2060 # The blocksize implied by the writer that we get from
2061 # _make_new_mw is 2bytes -- any more or any less than this
2062 # should be cause for failure, unless it is the tail segment, in
2063 # which case it may not be failure.
2065 mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2067 # 1 bytes != 2 bytes
2068 d = defer.succeed(None)
2069 d.addCallback(lambda ignored, invalid_block=invalid_block:
2070 self.shouldFail(LayoutInvalid, "test blocksize too small",
2071 None, mw.put_block, invalid_block, 0,
2073 invalid_block = invalid_block * 3
2074 # 3 bytes != 2 bytes
2075 d.addCallback(lambda ignored:
2076 self.shouldFail(LayoutInvalid, "test blocksize too large",
2078 mw.put_block, invalid_block, 0, self.salt))
2080 d.addCallback(lambda ignored, i=i:
2081 mw.put_block(self.block, i, self.salt))
2082 # Try to put an invalid tail segment
2083 d.addCallback(lambda ignored:
2084 self.shouldFail(LayoutInvalid, "test invalid tail segment",
2086 mw.put_block, self.block, 5, self.salt))
2088 d.addCallback(lambda ignored:
2089 mw.put_block(valid_block, 5, self.salt))
2093 def test_write_enforces_order_constraints(self):
2094 # We require that the MDMFSlotWriteProxy be interacted with in a
2098 # 1: write blocks and salts
2099 # 2: Write the encrypted private key
2100 # 3: Write the block hashes
2101 # 4: Write the share hashes
2102 # 5: Write the root hash and salt hash
2103 # 6: Write the signature and verification key
2104 # 7: Write the file.
2106 # Some of these can be performed out-of-order, and some can't.
2107 # The dependencies that I want to test here are:
2108 # - Private key before block hashes
2109 # - share hashes and block hashes before root hash
2110 # - root hash before signature
2111 # - signature before verification key
2112 mw0 = self._make_new_mw("si0", 0)
2114 d = defer.succeed(None)
2116 d.addCallback(lambda ignored, i=i:
2117 mw0.put_block(self.block, i, self.salt))
2119 # Try to write the share hash chain without writing the
2120 # encrypted private key
2121 d.addCallback(lambda ignored:
2122 self.shouldFail(LayoutInvalid, "share hash chain before "
2125 mw0.put_sharehashes, self.share_hash_chain))
2126 # Write the private key.
2127 d.addCallback(lambda ignored:
2128 mw0.put_encprivkey(self.encprivkey))
2130 # Now write the block hashes and try again
2131 d.addCallback(lambda ignored:
2132 mw0.put_blockhashes(self.block_hash_tree))
2134 # We haven't yet put the root hash on the share, so we shouldn't
2135 # be able to sign it.
2136 d.addCallback(lambda ignored:
2137 self.shouldFail(LayoutInvalid, "signature before root hash",
2138 None, mw0.put_signature, self.signature))
2140 d.addCallback(lambda ignored:
2141 self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2143 # ..and, since that fails, we also shouldn't be able to put the
2145 d.addCallback(lambda ignored:
2146 self.shouldFail(LayoutInvalid, "key before signature",
2147 None, mw0.put_verification_key,
2148 self.verification_key))
2150 # Now write the share hashes.
2151 d.addCallback(lambda ignored:
2152 mw0.put_sharehashes(self.share_hash_chain))
2153 # We should be able to write the root hash now too
2154 d.addCallback(lambda ignored:
2155 mw0.put_root_hash(self.root_hash))
2157 # We should still be unable to put the verification key
2158 d.addCallback(lambda ignored:
2159 self.shouldFail(LayoutInvalid, "key before signature",
2160 None, mw0.put_verification_key,
2161 self.verification_key))
2163 d.addCallback(lambda ignored:
2164 mw0.put_signature(self.signature))
2166 # We shouldn't be able to write the offsets to the remote server
2167 # until the offset table is finished; IOW, until we have written
2168 # the verification key.
2169 d.addCallback(lambda ignored:
2170 self.shouldFail(LayoutInvalid, "offsets before verification key",
2172 mw0.finish_publishing))
2174 d.addCallback(lambda ignored:
2175 mw0.put_verification_key(self.verification_key))
2179 def test_end_to_end(self):
2180 mw = self._make_new_mw("si1", 0)
2181 # Write a share using the mutable writer, and make sure that the
2182 # reader knows how to read everything back to us.
2183 d = defer.succeed(None)
2185 d.addCallback(lambda ignored, i=i:
2186 mw.put_block(self.block, i, self.salt))
2187 d.addCallback(lambda ignored:
2188 mw.put_encprivkey(self.encprivkey))
2189 d.addCallback(lambda ignored:
2190 mw.put_blockhashes(self.block_hash_tree))
2191 d.addCallback(lambda ignored:
2192 mw.put_sharehashes(self.share_hash_chain))
2193 d.addCallback(lambda ignored:
2194 mw.put_root_hash(self.root_hash))
2195 d.addCallback(lambda ignored:
2196 mw.put_signature(self.signature))
2197 d.addCallback(lambda ignored:
2198 mw.put_verification_key(self.verification_key))
2199 d.addCallback(lambda ignored:
2200 mw.finish_publishing())
2202 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2203 def _check_block_and_salt((block, salt)):
2204 self.failUnlessEqual(block, self.block)
2205 self.failUnlessEqual(salt, self.salt)
2208 d.addCallback(lambda ignored, i=i:
2209 mr.get_block_and_salt(i))
2210 d.addCallback(_check_block_and_salt)
2212 d.addCallback(lambda ignored:
2213 mr.get_encprivkey())
2214 d.addCallback(lambda encprivkey:
2215 self.failUnlessEqual(self.encprivkey, encprivkey))
2217 d.addCallback(lambda ignored:
2218 mr.get_blockhashes())
2219 d.addCallback(lambda blockhashes:
2220 self.failUnlessEqual(self.block_hash_tree, blockhashes))
2222 d.addCallback(lambda ignored:
2223 mr.get_sharehashes())
2224 d.addCallback(lambda sharehashes:
2225 self.failUnlessEqual(self.share_hash_chain, sharehashes))
2227 d.addCallback(lambda ignored:
2229 d.addCallback(lambda signature:
2230 self.failUnlessEqual(signature, self.signature))
2232 d.addCallback(lambda ignored:
2233 mr.get_verification_key())
2234 d.addCallback(lambda verification_key:
2235 self.failUnlessEqual(verification_key, self.verification_key))
2237 d.addCallback(lambda ignored:
2239 d.addCallback(lambda seqnum:
2240 self.failUnlessEqual(seqnum, 0))
2242 d.addCallback(lambda ignored:
2244 d.addCallback(lambda root_hash:
2245 self.failUnlessEqual(self.root_hash, root_hash))
2247 d.addCallback(lambda ignored:
2248 mr.get_encoding_parameters())
2249 def _check_encoding_parameters((k, n, segsize, datalen)):
2250 self.failUnlessEqual(k, 3)
2251 self.failUnlessEqual(n, 10)
2252 self.failUnlessEqual(segsize, 6)
2253 self.failUnlessEqual(datalen, 36)
2254 d.addCallback(_check_encoding_parameters)
2256 d.addCallback(lambda ignored:
2257 mr.get_checkstring())
2258 d.addCallback(lambda checkstring:
2259 self.failUnlessEqual(checkstring, mw.get_checkstring()))
2263 def test_is_sdmf(self):
2264 # The MDMFSlotReadProxy should also know how to read SDMF files,
2265 # since it will encounter them on the grid. Callers use the
2266 # is_sdmf method to test this.
2267 self.write_sdmf_share_to_server("si1")
2268 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2270 d.addCallback(lambda issdmf:
2271 self.failUnless(issdmf))
2275 def test_reads_sdmf(self):
2276 # The slot read proxy should, naturally, know how to tell us
2277 # about data in the SDMF format
2278 self.write_sdmf_share_to_server("si1")
2279 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2280 d = defer.succeed(None)
2281 d.addCallback(lambda ignored:
2283 d.addCallback(lambda issdmf:
2284 self.failUnless(issdmf))
2286 # What do we need to read?
2289 d.addCallback(lambda ignored:
2290 mr.get_block_and_salt(0))
2291 def _check_block_and_salt(results):
2292 block, salt = results
2293 # Our original file is 36 bytes long. Then each share is 12
2294 # bytes in size. The share is composed entirely of the
2295 # letter a. self.block contains 2 as, so 6 * self.block is
2296 # what we are looking for.
2297 self.failUnlessEqual(block, self.block * 6)
2298 self.failUnlessEqual(salt, self.salt)
2299 d.addCallback(_check_block_and_salt)
2302 d.addCallback(lambda ignored:
2303 mr.get_blockhashes())
2304 d.addCallback(lambda blockhashes:
2305 self.failUnlessEqual(self.block_hash_tree,
2309 d.addCallback(lambda ignored:
2310 mr.get_sharehashes())
2311 d.addCallback(lambda sharehashes:
2312 self.failUnlessEqual(self.share_hash_chain,
2315 d.addCallback(lambda ignored:
2316 mr.get_encprivkey())
2317 d.addCallback(lambda encprivkey:
2318 self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2319 d.addCallback(lambda ignored:
2320 mr.get_verification_key())
2321 d.addCallback(lambda verification_key:
2322 self.failUnlessEqual(verification_key,
2323 self.verification_key,
2326 d.addCallback(lambda ignored:
2328 d.addCallback(lambda signature:
2329 self.failUnlessEqual(signature, self.signature, signature))
2331 # - The sequence number
2332 d.addCallback(lambda ignored:
2334 d.addCallback(lambda seqnum:
2335 self.failUnlessEqual(seqnum, 0, seqnum))
2338 d.addCallback(lambda ignored:
2340 d.addCallback(lambda root_hash:
2341 self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2345 def test_only_reads_one_segment_sdmf(self):
2346 # SDMF shares have only one segment, so it doesn't make sense to
2347 # read more segments than that. The reader should know this and
2348 # complain if we try to do that.
2349 self.write_sdmf_share_to_server("si1")
2350 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2351 d = defer.succeed(None)
2352 d.addCallback(lambda ignored:
2354 d.addCallback(lambda issdmf:
2355 self.failUnless(issdmf))
2356 d.addCallback(lambda ignored:
2357 self.shouldFail(LayoutInvalid, "test bad segment",
2359 mr.get_block_and_salt, 1))
2363 def test_read_with_prefetched_mdmf_data(self):
2364 # The MDMFSlotReadProxy will prefill certain fields if you pass
2365 # it data that you have already fetched. This is useful for
2366 # cases like the Servermap, which prefetches ~2kb of data while
2367 # finding out which shares are on the remote peer so that it
2368 # doesn't waste round trips.
2369 mdmf_data = self.build_test_mdmf_share()
2370 self.write_test_share_to_server("si1")
2371 def _make_mr(ignored, length):
2372 mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2375 d = defer.succeed(None)
2376 # This should be enough to fill in both the encoding parameters
2377 # and the table of offsets, which will complete the version
2378 # information tuple.
2379 d.addCallback(_make_mr, 123)
2380 d.addCallback(lambda mr:
2382 def _check_verinfo(verinfo):
2383 self.failUnless(verinfo)
2384 self.failUnlessEqual(len(verinfo), 9)
2394 self.failUnlessEqual(seqnum, 0)
2395 self.failUnlessEqual(root_hash, self.root_hash)
2396 self.failUnlessEqual(segsize, 6)
2397 self.failUnlessEqual(datalen, 36)
2398 self.failUnlessEqual(k, 3)
2399 self.failUnlessEqual(n, 10)
2400 expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2408 self.failUnlessEqual(expected_prefix, prefix)
2409 self.failUnlessEqual(self.rref.read_count, 0)
2410 d.addCallback(_check_verinfo)
2411 # This is not enough data to read a block and a share, so the
2412 # wrapper should attempt to read this from the remote server.
2413 d.addCallback(_make_mr, 123)
2414 d.addCallback(lambda mr:
2415 mr.get_block_and_salt(0))
2416 def _check_block_and_salt((block, salt)):
2417 self.failUnlessEqual(block, self.block)
2418 self.failUnlessEqual(salt, self.salt)
2419 self.failUnlessEqual(self.rref.read_count, 1)
2420 # This should be enough data to read one block.
2421 d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2422 d.addCallback(lambda mr:
2423 mr.get_block_and_salt(0))
2424 d.addCallback(_check_block_and_salt)
2428 def test_read_with_prefetched_sdmf_data(self):
2429 sdmf_data = self.build_test_sdmf_share()
2430 self.write_sdmf_share_to_server("si1")
2431 def _make_mr(ignored, length):
2432 mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2435 d = defer.succeed(None)
2436 # This should be enough to get us the encoding parameters,
2437 # offset table, and everything else we need to build a verinfo
2439 d.addCallback(_make_mr, 123)
2440 d.addCallback(lambda mr:
2442 def _check_verinfo(verinfo):
2443 self.failUnless(verinfo)
2444 self.failUnlessEqual(len(verinfo), 9)
2454 self.failUnlessEqual(seqnum, 0)
2455 self.failUnlessEqual(root_hash, self.root_hash)
2456 self.failUnlessEqual(salt, self.salt)
2457 self.failUnlessEqual(segsize, 36)
2458 self.failUnlessEqual(datalen, 36)
2459 self.failUnlessEqual(k, 3)
2460 self.failUnlessEqual(n, 10)
2461 expected_prefix = struct.pack(SIGNED_PREFIX,
2470 self.failUnlessEqual(expected_prefix, prefix)
2471 self.failUnlessEqual(self.rref.read_count, 0)
2472 d.addCallback(_check_verinfo)
2473 # This shouldn't be enough to read any share data.
2474 d.addCallback(_make_mr, 123)
2475 d.addCallback(lambda mr:
2476 mr.get_block_and_salt(0))
2477 def _check_block_and_salt((block, salt)):
2478 self.failUnlessEqual(block, self.block * 6)
2479 self.failUnlessEqual(salt, self.salt)
2480 # TODO: Fix the read routine so that it reads only the data
2481 # that it has cached if it can't read all of it.
2482 self.failUnlessEqual(self.rref.read_count, 2)
2484 # This should be enough to read share data.
2485 d.addCallback(_make_mr, self.offsets['share_data'])
2486 d.addCallback(lambda mr:
2487 mr.get_block_and_salt(0))
2488 d.addCallback(_check_block_and_salt)
2492 def test_read_with_empty_mdmf_file(self):
2493 # Some tests upload a file with no contents to test things
2494 # unrelated to the actual handling of the content of the file.
2495 # The reader should behave intelligently in these cases.
2496 self.write_test_share_to_server("si1", empty=True)
2497 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2498 # We should be able to get the encoding parameters, and they
2499 # should be correct.
2500 d = defer.succeed(None)
2501 d.addCallback(lambda ignored:
2502 mr.get_encoding_parameters())
2503 def _check_encoding_parameters(params):
2504 self.failUnlessEqual(len(params), 4)
2505 k, n, segsize, datalen = params
2506 self.failUnlessEqual(k, 3)
2507 self.failUnlessEqual(n, 10)
2508 self.failUnlessEqual(segsize, 0)
2509 self.failUnlessEqual(datalen, 0)
2510 d.addCallback(_check_encoding_parameters)
2512 # We should not be able to fetch a block, since there are no
2514 d.addCallback(lambda ignored:
2515 self.shouldFail(LayoutInvalid, "get block on empty file",
2517 mr.get_block_and_salt, 0))
2521 def test_read_with_empty_sdmf_file(self):
2522 self.write_sdmf_share_to_server("si1", empty=True)
2523 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2524 # We should be able to get the encoding parameters, and they
2526 d = defer.succeed(None)
2527 d.addCallback(lambda ignored:
2528 mr.get_encoding_parameters())
2529 def _check_encoding_parameters(params):
2530 self.failUnlessEqual(len(params), 4)
2531 k, n, segsize, datalen = params
2532 self.failUnlessEqual(k, 3)
2533 self.failUnlessEqual(n, 10)
2534 self.failUnlessEqual(segsize, 0)
2535 self.failUnlessEqual(datalen, 0)
2536 d.addCallback(_check_encoding_parameters)
2538 # It does not make sense to get a block in this format, so we
2539 # should not be able to.
2540 d.addCallback(lambda ignored:
2541 self.shouldFail(LayoutInvalid, "get block on an empty file",
2543 mr.get_block_and_salt, 0))
2547 def test_verinfo_with_sdmf_file(self):
2548 self.write_sdmf_share_to_server("si1")
2549 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2550 # We should be able to get the version information.
2551 d = defer.succeed(None)
2552 d.addCallback(lambda ignored:
2554 def _check_verinfo(verinfo):
2555 self.failUnless(verinfo)
2556 self.failUnlessEqual(len(verinfo), 9)
2566 self.failUnlessEqual(seqnum, 0)
2567 self.failUnlessEqual(root_hash, self.root_hash)
2568 self.failUnlessEqual(salt, self.salt)
2569 self.failUnlessEqual(segsize, 36)
2570 self.failUnlessEqual(datalen, 36)
2571 self.failUnlessEqual(k, 3)
2572 self.failUnlessEqual(n, 10)
2573 expected_prefix = struct.pack(">BQ32s16s BBQQ",
2582 self.failUnlessEqual(prefix, expected_prefix)
2583 self.failUnlessEqual(offsets, self.offsets)
2584 d.addCallback(_check_verinfo)
2588 def test_verinfo_with_mdmf_file(self):
2589 self.write_test_share_to_server("si1")
2590 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2591 d = defer.succeed(None)
2592 d.addCallback(lambda ignored:
2594 def _check_verinfo(verinfo):
2595 self.failUnless(verinfo)
2596 self.failUnlessEqual(len(verinfo), 9)
2606 self.failUnlessEqual(seqnum, 0)
2607 self.failUnlessEqual(root_hash, self.root_hash)
2609 self.failUnlessEqual(segsize, 6)
2610 self.failUnlessEqual(datalen, 36)
2611 self.failUnlessEqual(k, 3)
2612 self.failUnlessEqual(n, 10)
2613 expected_prefix = struct.pack(">BQ32s BBQQ",
2621 self.failUnlessEqual(prefix, expected_prefix)
2622 self.failUnlessEqual(offsets, self.offsets)
2623 d.addCallback(_check_verinfo)
2627 def test_sdmf_writer(self):
2628 # Go through the motions of writing an SDMF share to the storage
2629 # server. Then read the storage server to see that the share got
2630 # written in the way that we think it should have.
2632 # We do this first so that the necessary instance variables get
2633 # set the way we want them for the tests below.
2634 data = self.build_test_sdmf_share()
2635 sdmfr = SDMFSlotWriteProxy(0,
2640 # Put the block and salt.
2641 sdmfr.put_block(self.blockdata, 0, self.salt)
2643 # Put the encprivkey
2644 sdmfr.put_encprivkey(self.encprivkey)
2646 # Put the block and share hash chains
2647 sdmfr.put_blockhashes(self.block_hash_tree)
2648 sdmfr.put_sharehashes(self.share_hash_chain)
2649 sdmfr.put_root_hash(self.root_hash)
2652 sdmfr.put_signature(self.signature)
2654 # Put the verification key
2655 sdmfr.put_verification_key(self.verification_key)
2657 # Now check to make sure that nothing has been written yet.
2658 self.failUnlessEqual(self.rref.write_count, 0)
2660 # Now finish publishing
2661 d = sdmfr.finish_publishing()
2663 self.failUnlessEqual(self.rref.write_count, 1)
2664 read = self.ss.remote_slot_readv
2665 self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2667 d.addCallback(_then)
2671 def test_sdmf_writer_preexisting_share(self):
2672 data = self.build_test_sdmf_share()
2673 self.write_sdmf_share_to_server("si1")
2675 # Now there is a share on the storage server. To successfully
2676 # write, we need to set the checkstring correctly. When we
2677 # don't, no write should occur.
2678 sdmfw = SDMFSlotWriteProxy(0,
2683 sdmfw.put_block(self.blockdata, 0, self.salt)
2685 # Put the encprivkey
2686 sdmfw.put_encprivkey(self.encprivkey)
2688 # Put the block and share hash chains
2689 sdmfw.put_blockhashes(self.block_hash_tree)
2690 sdmfw.put_sharehashes(self.share_hash_chain)
2693 sdmfw.put_root_hash(self.root_hash)
2696 sdmfw.put_signature(self.signature)
2698 # Put the verification key
2699 sdmfw.put_verification_key(self.verification_key)
2701 # We shouldn't have a checkstring yet
2702 self.failUnlessEqual(sdmfw.get_checkstring(), "")
2704 d = sdmfw.finish_publishing()
2706 self.failIf(results[0])
2707 # this is the correct checkstring
2708 self._expected_checkstring = results[1][0][0]
2709 return self._expected_checkstring
2711 d.addCallback(_then)
2712 d.addCallback(sdmfw.set_checkstring)
2713 d.addCallback(lambda ignored:
2714 sdmfw.get_checkstring())
2715 d.addCallback(lambda checkstring:
2716 self.failUnlessEqual(checkstring, self._expected_checkstring))
2717 d.addCallback(lambda ignored:
2718 sdmfw.finish_publishing())
2719 def _then_again(results):
2720 self.failUnless(results[0])
2721 read = self.ss.remote_slot_readv
2722 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2723 {0: [struct.pack(">Q", 1)]})
2724 self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2726 d.addCallback(_then_again)
2730 class Stats(unittest.TestCase):
2733 self.sparent = LoggingServiceParent()
2734 self._lease_secret = itertools.count()
2736 return self.sparent.stopService()
2738 def workdir(self, name):
2739 basedir = os.path.join("storage", "Server", name)
2742 def create(self, name):
2743 workdir = self.workdir(name)
2744 ss = StorageServer(workdir, "\x00" * 20)
2745 ss.setServiceParent(self.sparent)
2748 def test_latencies(self):
2749 ss = self.create("test_latencies")
2750 for i in range(10000):
2751 ss.add_latency("allocate", 1.0 * i)
2752 for i in range(1000):
2753 ss.add_latency("renew", 1.0 * i)
2755 ss.add_latency("write", 1.0 * i)
2757 ss.add_latency("cancel", 2.0 * i)
2758 ss.add_latency("get", 5.0)
2760 output = ss.get_latencies()
2762 self.failUnlessEqual(sorted(output.keys()),
2763 sorted(["allocate", "renew", "cancel", "write", "get"]))
2764 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2765 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2766 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2767 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2768 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2769 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2770 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2771 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2772 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2774 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2775 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2776 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
2777 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2778 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2779 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2780 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2781 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2782 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2784 self.failUnlessEqual(len(ss.latencies["write"]), 20)
2785 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2786 self.failUnless(output["write"]["01_0_percentile"] is None, output)
2787 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
2788 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2789 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2790 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2791 self.failUnless(output["write"]["99_0_percentile"] is None, output)
2792 self.failUnless(output["write"]["99_9_percentile"] is None, output)
2794 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2795 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2796 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2797 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
2798 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2799 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2800 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2801 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2802 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2804 self.failUnlessEqual(len(ss.latencies["get"]), 1)
2805 self.failUnless(output["get"]["mean"] is None, output)
2806 self.failUnless(output["get"]["01_0_percentile"] is None, output)
2807 self.failUnless(output["get"]["10_0_percentile"] is None, output)
2808 self.failUnless(output["get"]["50_0_percentile"] is None, output)
2809 self.failUnless(output["get"]["90_0_percentile"] is None, output)
2810 self.failUnless(output["get"]["95_0_percentile"] is None, output)
2811 self.failUnless(output["get"]["99_0_percentile"] is None, output)
2812 self.failUnless(output["get"]["99_9_percentile"] is None, output)
2815 s = re.sub(r'<[^>]*>', ' ', s)
2816 s = re.sub(r'\s+', ' ', s)
2819 class MyBucketCountingCrawler(BucketCountingCrawler):
2820 def finished_prefix(self, cycle, prefix):
2821 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2823 d = self.hook_ds.pop(0)
2826 class MyStorageServer(StorageServer):
2827 def add_bucket_counter(self):
2828 statefile = os.path.join(self.storedir, "bucket_counter.state")
2829 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2830 self.bucket_counter.setServiceParent(self)
2832 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2835 self.s = service.MultiService()
2836 self.s.startService()
2838 return self.s.stopService()
2840 def test_bucket_counter(self):
2841 basedir = "storage/BucketCounter/bucket_counter"
2842 fileutil.make_dirs(basedir)
2843 ss = StorageServer(basedir, "\x00" * 20)
2844 # to make sure we capture the bucket-counting-crawler in the middle
2845 # of a cycle, we reach in and reduce its maximum slice time to 0. We
2846 # also make it start sooner than usual.
2847 ss.bucket_counter.slow_start = 0
2848 orig_cpu_slice = ss.bucket_counter.cpu_slice
2849 ss.bucket_counter.cpu_slice = 0
2850 ss.setServiceParent(self.s)
2852 w = StorageStatus(ss)
2854 # this sample is before the crawler has started doing anything
2855 html = w.renderSynchronously()
2856 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2857 s = remove_tags(html)
2858 self.failUnlessIn("Accepting new shares: Yes", s)
2859 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2860 self.failUnlessIn("Total buckets: Not computed yet", s)
2861 self.failUnlessIn("Next crawl in", s)
2863 # give the bucket-counting-crawler one tick to get started. The
2864 # cpu_slice=0 will force it to yield right after it processes the
2867 d = fireEventually()
2868 def _check(ignored):
2869 # are we really right after the first prefix?
2870 state = ss.bucket_counter.get_state()
2871 if state["last-complete-prefix"] is None:
2872 d2 = fireEventually()
2873 d2.addCallback(_check)
2875 self.failUnlessEqual(state["last-complete-prefix"],
2876 ss.bucket_counter.prefixes[0])
2877 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2878 html = w.renderSynchronously()
2879 s = remove_tags(html)
2880 self.failUnlessIn(" Current crawl ", s)
2881 self.failUnlessIn(" (next work in ", s)
2882 d.addCallback(_check)
2884 # now give it enough time to complete a full cycle
2886 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2887 d.addCallback(lambda ignored: self.poll(_watch))
2888 def _check2(ignored):
2889 ss.bucket_counter.cpu_slice = orig_cpu_slice
2890 html = w.renderSynchronously()
2891 s = remove_tags(html)
2892 self.failUnlessIn("Total buckets: 0 (the number of", s)
2893 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2894 d.addCallback(_check2)
2897 def test_bucket_counter_cleanup(self):
2898 basedir = "storage/BucketCounter/bucket_counter_cleanup"
2899 fileutil.make_dirs(basedir)
2900 ss = StorageServer(basedir, "\x00" * 20)
2901 # to make sure we capture the bucket-counting-crawler in the middle
2902 # of a cycle, we reach in and reduce its maximum slice time to 0.
2903 ss.bucket_counter.slow_start = 0
2904 orig_cpu_slice = ss.bucket_counter.cpu_slice
2905 ss.bucket_counter.cpu_slice = 0
2906 ss.setServiceParent(self.s)
2908 d = fireEventually()
2910 def _after_first_prefix(ignored):
2911 state = ss.bucket_counter.state
2912 if state["last-complete-prefix"] is None:
2913 d2 = fireEventually()
2914 d2.addCallback(_after_first_prefix)
2916 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2917 # now sneak in and mess with its state, to make sure it cleans up
2918 # properly at the end of the cycle
2919 self.failUnlessEqual(state["last-complete-prefix"],
2920 ss.bucket_counter.prefixes[0])
2921 state["bucket-counts"][-12] = {}
2922 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2923 ss.bucket_counter.save_state()
2924 d.addCallback(_after_first_prefix)
2926 # now give it enough time to complete a cycle
2928 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2929 d.addCallback(lambda ignored: self.poll(_watch))
2930 def _check2(ignored):
2931 ss.bucket_counter.cpu_slice = orig_cpu_slice
2932 s = ss.bucket_counter.get_state()
2933 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2934 self.failIf("bogusprefix!" in s["storage-index-samples"],
2935 s["storage-index-samples"].keys())
2936 d.addCallback(_check2)
2939 def test_bucket_counter_eta(self):
2940 basedir = "storage/BucketCounter/bucket_counter_eta"
2941 fileutil.make_dirs(basedir)
2942 ss = MyStorageServer(basedir, "\x00" * 20)
2943 ss.bucket_counter.slow_start = 0
2944 # these will be fired inside finished_prefix()
2945 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2946 w = StorageStatus(ss)
2948 d = defer.Deferred()
2950 def _check_1(ignored):
2951 # no ETA is available yet
2952 html = w.renderSynchronously()
2953 s = remove_tags(html)
2954 self.failUnlessIn("complete (next work", s)
2956 def _check_2(ignored):
2957 # one prefix has finished, so an ETA based upon that elapsed time
2958 # should be available.
2959 html = w.renderSynchronously()
2960 s = remove_tags(html)
2961 self.failUnlessIn("complete (ETA ", s)
2963 def _check_3(ignored):
2964 # two prefixes have finished
2965 html = w.renderSynchronously()
2966 s = remove_tags(html)
2967 self.failUnlessIn("complete (ETA ", s)
2970 hooks[0].addCallback(_check_1).addErrback(d.errback)
2971 hooks[1].addCallback(_check_2).addErrback(d.errback)
2972 hooks[2].addCallback(_check_3).addErrback(d.errback)
2974 ss.setServiceParent(self.s)
2977 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
2978 stop_after_first_bucket = False
2979 def process_bucket(self, *args, **kwargs):
2980 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
2981 if self.stop_after_first_bucket:
2982 self.stop_after_first_bucket = False
2983 self.cpu_slice = -1.0
2984 def yielding(self, sleep_time):
2985 if not self.stop_after_first_bucket:
2986 self.cpu_slice = 500
2988 class BrokenStatResults:
2990 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
2993 bsr = BrokenStatResults()
2994 for attrname in dir(s):
2995 if attrname.startswith("_"):
2997 if attrname == "st_blocks":
2999 setattr(bsr, attrname, getattr(s, attrname))
3002 class InstrumentedStorageServer(StorageServer):
3003 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3004 class No_ST_BLOCKS_StorageServer(StorageServer):
3005 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3007 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3010 self.s = service.MultiService()
3011 self.s.startService()
3013 return self.s.stopService()
3015 def make_shares(self, ss):
3017 return (si, hashutil.tagged_hash("renew", si),
3018 hashutil.tagged_hash("cancel", si))
3019 def make_mutable(si):
3020 return (si, hashutil.tagged_hash("renew", si),
3021 hashutil.tagged_hash("cancel", si),
3022 hashutil.tagged_hash("write-enabler", si))
3023 def make_extra_lease(si, num):
3024 return (hashutil.tagged_hash("renew-%d" % num, si),
3025 hashutil.tagged_hash("cancel-%d" % num, si))
3027 immutable_si_0, rs0, cs0 = make("\x00" * 16)
3028 immutable_si_1, rs1, cs1 = make("\x01" * 16)
3029 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3030 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3031 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3032 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3034 canary = FakeCanary()
3035 # note: 'tahoe debug dump-share' will not handle this file, since the
3036 # inner contents are not a valid CHK share
3037 data = "\xff" * 1000
3039 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3041 w[0].remote_write(0, data)
3044 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3046 w[0].remote_write(0, data)
3048 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3050 writev = ss.remote_slot_testv_and_readv_and_writev
3051 writev(mutable_si_2, (we2, rs2, cs2),
3052 {0: ([], [(0,data)], len(data))}, [])
3053 writev(mutable_si_3, (we3, rs3, cs3),
3054 {0: ([], [(0,data)], len(data))}, [])
3055 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3057 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3058 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3059 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3061 def test_basic(self):
3062 basedir = "storage/LeaseCrawler/basic"
3063 fileutil.make_dirs(basedir)
3064 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3065 # make it start sooner than usual.
3066 lc = ss.lease_checker
3069 lc.stop_after_first_bucket = True
3070 webstatus = StorageStatus(ss)
3072 # create a few shares, with some leases on them
3073 self.make_shares(ss)
3074 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3076 # add a non-sharefile to exercise another code path
3077 fn = os.path.join(ss.sharedir,
3078 storage_index_to_dir(immutable_si_0),
3081 f.write("I am not a share.\n")
3084 # this is before the crawl has started, so we're not in a cycle yet
3085 initial_state = lc.get_state()
3086 self.failIf(lc.get_progress()["cycle-in-progress"])
3087 self.failIfIn("cycle-to-date", initial_state)
3088 self.failIfIn("estimated-remaining-cycle", initial_state)
3089 self.failIfIn("estimated-current-cycle", initial_state)
3090 self.failUnlessIn("history", initial_state)
3091 self.failUnlessEqual(initial_state["history"], {})
3093 ss.setServiceParent(self.s)
3097 d = fireEventually()
3099 # now examine the state right after the first bucket has been
3101 def _after_first_bucket(ignored):
3102 initial_state = lc.get_state()
3103 if "cycle-to-date" not in initial_state:
3104 d2 = fireEventually()
3105 d2.addCallback(_after_first_bucket)
3107 self.failUnlessIn("cycle-to-date", initial_state)
3108 self.failUnlessIn("estimated-remaining-cycle", initial_state)
3109 self.failUnlessIn("estimated-current-cycle", initial_state)
3110 self.failUnlessIn("history", initial_state)
3111 self.failUnlessEqual(initial_state["history"], {})
3113 so_far = initial_state["cycle-to-date"]
3114 self.failUnlessEqual(so_far["expiration-enabled"], False)
3115 self.failUnlessIn("configured-expiration-mode", so_far)
3116 self.failUnlessIn("lease-age-histogram", so_far)
3117 lah = so_far["lease-age-histogram"]
3118 self.failUnlessEqual(type(lah), list)
3119 self.failUnlessEqual(len(lah), 1)
3120 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3121 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3122 self.failUnlessEqual(so_far["corrupt-shares"], [])
3123 sr1 = so_far["space-recovered"]
3124 self.failUnlessEqual(sr1["examined-buckets"], 1)
3125 self.failUnlessEqual(sr1["examined-shares"], 1)
3126 self.failUnlessEqual(sr1["actual-shares"], 0)
3127 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3128 self.failUnlessEqual(sr1["original-sharebytes"], 0)
3129 left = initial_state["estimated-remaining-cycle"]
3130 sr2 = left["space-recovered"]
3131 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3132 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3133 self.failIfEqual(sr2["actual-shares"], None)
3134 self.failIfEqual(sr2["configured-diskbytes"], None)
3135 self.failIfEqual(sr2["original-sharebytes"], None)
3136 d.addCallback(_after_first_bucket)
3137 d.addCallback(lambda ign: self.render1(webstatus))
3138 def _check_html_in_cycle(html):
3139 s = remove_tags(html)
3140 self.failUnlessIn("So far, this cycle has examined "
3141 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3142 self.failUnlessIn("and has recovered: "
3143 "0 shares, 0 buckets (0 mutable / 0 immutable), "
3144 "0 B (0 B / 0 B)", s)
3145 self.failUnlessIn("If expiration were enabled, "
3146 "we would have recovered: "
3147 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3148 " 0 B (0 B / 0 B) by now", s)
3149 self.failUnlessIn("and the remainder of this cycle "
3150 "would probably recover: "
3151 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3152 " 0 B (0 B / 0 B)", s)
3153 self.failUnlessIn("and the whole cycle would probably recover: "
3154 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3155 " 0 B (0 B / 0 B)", s)
3156 self.failUnlessIn("if we were strictly using each lease's default "
3157 "31-day lease lifetime", s)
3158 self.failUnlessIn("this cycle would be expected to recover: ", s)
3159 d.addCallback(_check_html_in_cycle)
3161 # wait for the crawler to finish the first cycle. Nothing should have
3164 return bool(lc.get_state()["last-cycle-finished"] is not None)
3165 d.addCallback(lambda ign: self.poll(_wait))
3167 def _after_first_cycle(ignored):
3169 self.failIf("cycle-to-date" in s)
3170 self.failIf("estimated-remaining-cycle" in s)
3171 self.failIf("estimated-current-cycle" in s)
3172 last = s["history"][0]
3173 self.failUnlessIn("cycle-start-finish-times", last)
3174 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3175 self.failUnlessEqual(last["expiration-enabled"], False)
3176 self.failUnlessIn("configured-expiration-mode", last)
3178 self.failUnlessIn("lease-age-histogram", last)
3179 lah = last["lease-age-histogram"]
3180 self.failUnlessEqual(type(lah), list)
3181 self.failUnlessEqual(len(lah), 1)
3182 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3184 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3185 self.failUnlessEqual(last["corrupt-shares"], [])
3187 rec = last["space-recovered"]
3188 self.failUnlessEqual(rec["examined-buckets"], 4)
3189 self.failUnlessEqual(rec["examined-shares"], 4)
3190 self.failUnlessEqual(rec["actual-buckets"], 0)
3191 self.failUnlessEqual(rec["original-buckets"], 0)
3192 self.failUnlessEqual(rec["configured-buckets"], 0)
3193 self.failUnlessEqual(rec["actual-shares"], 0)
3194 self.failUnlessEqual(rec["original-shares"], 0)
3195 self.failUnlessEqual(rec["configured-shares"], 0)
3196 self.failUnlessEqual(rec["actual-diskbytes"], 0)
3197 self.failUnlessEqual(rec["original-diskbytes"], 0)
3198 self.failUnlessEqual(rec["configured-diskbytes"], 0)
3199 self.failUnlessEqual(rec["actual-sharebytes"], 0)
3200 self.failUnlessEqual(rec["original-sharebytes"], 0)
3201 self.failUnlessEqual(rec["configured-sharebytes"], 0)
3203 def _get_sharefile(si):
3204 return list(ss._iter_share_files(si))[0]
3205 def count_leases(si):
3206 return len(list(_get_sharefile(si).get_leases()))
3207 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3208 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3209 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3210 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3211 d.addCallback(_after_first_cycle)
3212 d.addCallback(lambda ign: self.render1(webstatus))
3213 def _check_html(html):
3214 s = remove_tags(html)
3215 self.failUnlessIn("recovered: 0 shares, 0 buckets "
3216 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3217 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3218 "(2 mutable / 2 immutable),", s)
3219 self.failUnlessIn("but expiration was not enabled", s)
3220 d.addCallback(_check_html)
3221 d.addCallback(lambda ign: self.render_json(webstatus))
3222 def _check_json(json):
3223 data = simplejson.loads(json)
3224 self.failUnlessIn("lease-checker", data)
3225 self.failUnlessIn("lease-checker-progress", data)
3226 d.addCallback(_check_json)
3229 def backdate_lease(self, sf, renew_secret, new_expire_time):
3230 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3231 # "renew" a lease with a new_expire_time that is older than what the
3232 # current lease has), so we have to reach inside it.
3233 for i,lease in enumerate(sf.get_leases()):
3234 if lease.renew_secret == renew_secret:
3235 lease.expiration_time = new_expire_time
3236 f = open(sf.home, 'rb+')
3237 sf._write_lease_record(f, i, lease)
3240 raise IndexError("unable to renew non-existent lease")
3242 def test_expire_age(self):
3243 basedir = "storage/LeaseCrawler/expire_age"
3244 fileutil.make_dirs(basedir)
3245 # setting expiration_time to 2000 means that any lease which is more
3246 # than 2000s old will be expired.
3247 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3248 expiration_enabled=True,
3249 expiration_mode="age",
3250 expiration_override_lease_duration=2000)
3251 # make it start sooner than usual.
3252 lc = ss.lease_checker
3254 lc.stop_after_first_bucket = True
3255 webstatus = StorageStatus(ss)
3257 # create a few shares, with some leases on them
3258 self.make_shares(ss)
3259 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3261 def count_shares(si):
3262 return len(list(ss._iter_share_files(si)))
3263 def _get_sharefile(si):
3264 return list(ss._iter_share_files(si))[0]
3265 def count_leases(si):
3266 return len(list(_get_sharefile(si).get_leases()))
3268 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3269 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3270 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3271 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3272 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3273 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3274 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3275 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3277 # artificially crank back the expiration time on the first lease of
3278 # each share, to make it look like it expired already (age=1000s).
3279 # Some shares have an extra lease which is set to expire at the
3280 # default time in 31 days from now (age=31days). We then run the
3281 # crawler, which will expire the first lease, making some shares get
3282 # deleted and others stay alive (with one remaining lease)
3285 sf0 = _get_sharefile(immutable_si_0)
3286 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3287 sf0_size = os.stat(sf0.home).st_size
3289 # immutable_si_1 gets an extra lease
3290 sf1 = _get_sharefile(immutable_si_1)
3291 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3293 sf2 = _get_sharefile(mutable_si_2)
3294 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3295 sf2_size = os.stat(sf2.home).st_size
3297 # mutable_si_3 gets an extra lease
3298 sf3 = _get_sharefile(mutable_si_3)
3299 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3301 ss.setServiceParent(self.s)
3303 d = fireEventually()
3304 # examine the state right after the first bucket has been processed
3305 def _after_first_bucket(ignored):
3306 p = lc.get_progress()
3307 if not p["cycle-in-progress"]:
3308 d2 = fireEventually()
3309 d2.addCallback(_after_first_bucket)
3311 d.addCallback(_after_first_bucket)
3312 d.addCallback(lambda ign: self.render1(webstatus))
3313 def _check_html_in_cycle(html):
3314 s = remove_tags(html)
3315 # the first bucket encountered gets deleted, and its prefix
3316 # happens to be about 1/5th of the way through the ring, so the
3317 # predictor thinks we'll have 5 shares and that we'll delete them
3318 # all. This part of the test depends upon the SIs landing right
3319 # where they do now.
3320 self.failUnlessIn("The remainder of this cycle is expected to "
3321 "recover: 4 shares, 4 buckets", s)
3322 self.failUnlessIn("The whole cycle is expected to examine "
3323 "5 shares in 5 buckets and to recover: "
3324 "5 shares, 5 buckets", s)
3325 d.addCallback(_check_html_in_cycle)
3327 # wait for the crawler to finish the first cycle. Two shares should
3330 return bool(lc.get_state()["last-cycle-finished"] is not None)
3331 d.addCallback(lambda ign: self.poll(_wait))
3333 def _after_first_cycle(ignored):
3334 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3335 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3336 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3337 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3338 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3339 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3342 last = s["history"][0]
3344 self.failUnlessEqual(last["expiration-enabled"], True)
3345 self.failUnlessEqual(last["configured-expiration-mode"],
3346 ("age", 2000, None, ("mutable", "immutable")))
3347 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3349 rec = last["space-recovered"]
3350 self.failUnlessEqual(rec["examined-buckets"], 4)
3351 self.failUnlessEqual(rec["examined-shares"], 4)
3352 self.failUnlessEqual(rec["actual-buckets"], 2)
3353 self.failUnlessEqual(rec["original-buckets"], 2)
3354 self.failUnlessEqual(rec["configured-buckets"], 2)
3355 self.failUnlessEqual(rec["actual-shares"], 2)
3356 self.failUnlessEqual(rec["original-shares"], 2)
3357 self.failUnlessEqual(rec["configured-shares"], 2)
3358 size = sf0_size + sf2_size
3359 self.failUnlessEqual(rec["actual-sharebytes"], size)
3360 self.failUnlessEqual(rec["original-sharebytes"], size)
3361 self.failUnlessEqual(rec["configured-sharebytes"], size)
3362 # different platforms have different notions of "blocks used by
3363 # this file", so merely assert that it's a number
3364 self.failUnless(rec["actual-diskbytes"] >= 0,
3365 rec["actual-diskbytes"])
3366 self.failUnless(rec["original-diskbytes"] >= 0,
3367 rec["original-diskbytes"])
3368 self.failUnless(rec["configured-diskbytes"] >= 0,
3369 rec["configured-diskbytes"])
3370 d.addCallback(_after_first_cycle)
3371 d.addCallback(lambda ign: self.render1(webstatus))
3372 def _check_html(html):
3373 s = remove_tags(html)
3374 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3375 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3376 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3377 d.addCallback(_check_html)
3380 def test_expire_cutoff_date(self):
3381 basedir = "storage/LeaseCrawler/expire_cutoff_date"
3382 fileutil.make_dirs(basedir)
3383 # setting cutoff-date to 2000 seconds ago means that any lease which
3384 # is more than 2000s old will be expired.
3386 then = int(now - 2000)
3387 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3388 expiration_enabled=True,
3389 expiration_mode="cutoff-date",
3390 expiration_cutoff_date=then)
3391 # make it start sooner than usual.
3392 lc = ss.lease_checker
3394 lc.stop_after_first_bucket = True
3395 webstatus = StorageStatus(ss)
3397 # create a few shares, with some leases on them
3398 self.make_shares(ss)
3399 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3401 def count_shares(si):
3402 return len(list(ss._iter_share_files(si)))
3403 def _get_sharefile(si):
3404 return list(ss._iter_share_files(si))[0]
3405 def count_leases(si):
3406 return len(list(_get_sharefile(si).get_leases()))
3408 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3409 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3410 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3411 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3412 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3413 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3414 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3415 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3417 # artificially crank back the expiration time on the first lease of
3418 # each share, to make it look like was renewed 3000s ago. To achieve
3419 # this, we need to set the expiration time to now-3000+31days. This
3420 # will change when the lease format is improved to contain both
3421 # create/renew time and duration.
3422 new_expiration_time = now - 3000 + 31*24*60*60
3424 # Some shares have an extra lease which is set to expire at the
3425 # default time in 31 days from now (age=31days). We then run the
3426 # crawler, which will expire the first lease, making some shares get
3427 # deleted and others stay alive (with one remaining lease)
3429 sf0 = _get_sharefile(immutable_si_0)
3430 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3431 sf0_size = os.stat(sf0.home).st_size
3433 # immutable_si_1 gets an extra lease
3434 sf1 = _get_sharefile(immutable_si_1)
3435 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3437 sf2 = _get_sharefile(mutable_si_2)
3438 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3439 sf2_size = os.stat(sf2.home).st_size
3441 # mutable_si_3 gets an extra lease
3442 sf3 = _get_sharefile(mutable_si_3)
3443 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3445 ss.setServiceParent(self.s)
3447 d = fireEventually()
3448 # examine the state right after the first bucket has been processed
3449 def _after_first_bucket(ignored):
3450 p = lc.get_progress()
3451 if not p["cycle-in-progress"]:
3452 d2 = fireEventually()
3453 d2.addCallback(_after_first_bucket)
3455 d.addCallback(_after_first_bucket)
3456 d.addCallback(lambda ign: self.render1(webstatus))
3457 def _check_html_in_cycle(html):
3458 s = remove_tags(html)
3459 # the first bucket encountered gets deleted, and its prefix
3460 # happens to be about 1/5th of the way through the ring, so the
3461 # predictor thinks we'll have 5 shares and that we'll delete them
3462 # all. This part of the test depends upon the SIs landing right
3463 # where they do now.
3464 self.failUnlessIn("The remainder of this cycle is expected to "
3465 "recover: 4 shares, 4 buckets", s)
3466 self.failUnlessIn("The whole cycle is expected to examine "
3467 "5 shares in 5 buckets and to recover: "
3468 "5 shares, 5 buckets", s)
3469 d.addCallback(_check_html_in_cycle)
3471 # wait for the crawler to finish the first cycle. Two shares should
3474 return bool(lc.get_state()["last-cycle-finished"] is not None)
3475 d.addCallback(lambda ign: self.poll(_wait))
3477 def _after_first_cycle(ignored):
3478 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3479 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3480 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3481 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3482 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3483 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3486 last = s["history"][0]
3488 self.failUnlessEqual(last["expiration-enabled"], True)
3489 self.failUnlessEqual(last["configured-expiration-mode"],
3490 ("cutoff-date", None, then,
3491 ("mutable", "immutable")))
3492 self.failUnlessEqual(last["leases-per-share-histogram"],
3495 rec = last["space-recovered"]
3496 self.failUnlessEqual(rec["examined-buckets"], 4)
3497 self.failUnlessEqual(rec["examined-shares"], 4)
3498 self.failUnlessEqual(rec["actual-buckets"], 2)
3499 self.failUnlessEqual(rec["original-buckets"], 0)
3500 self.failUnlessEqual(rec["configured-buckets"], 2)
3501 self.failUnlessEqual(rec["actual-shares"], 2)
3502 self.failUnlessEqual(rec["original-shares"], 0)
3503 self.failUnlessEqual(rec["configured-shares"], 2)
3504 size = sf0_size + sf2_size
3505 self.failUnlessEqual(rec["actual-sharebytes"], size)
3506 self.failUnlessEqual(rec["original-sharebytes"], 0)
3507 self.failUnlessEqual(rec["configured-sharebytes"], size)
3508 # different platforms have different notions of "blocks used by
3509 # this file", so merely assert that it's a number
3510 self.failUnless(rec["actual-diskbytes"] >= 0,
3511 rec["actual-diskbytes"])
3512 self.failUnless(rec["original-diskbytes"] >= 0,
3513 rec["original-diskbytes"])
3514 self.failUnless(rec["configured-diskbytes"] >= 0,
3515 rec["configured-diskbytes"])
3516 d.addCallback(_after_first_cycle)
3517 d.addCallback(lambda ign: self.render1(webstatus))
3518 def _check_html(html):
3519 s = remove_tags(html)
3520 self.failUnlessIn("Expiration Enabled:"
3521 " expired leases will be removed", s)
3522 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3523 substr = "Leases created or last renewed before %s will be considered expired." % date
3524 self.failUnlessIn(substr, s)
3525 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3526 d.addCallback(_check_html)
3529 def test_only_immutable(self):
3530 basedir = "storage/LeaseCrawler/only_immutable"
3531 fileutil.make_dirs(basedir)
3533 then = int(now - 2000)
3534 ss = StorageServer(basedir, "\x00" * 20,
3535 expiration_enabled=True,
3536 expiration_mode="cutoff-date",
3537 expiration_cutoff_date=then,
3538 expiration_sharetypes=("immutable",))
3539 lc = ss.lease_checker
3541 webstatus = StorageStatus(ss)
3543 self.make_shares(ss)
3544 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3545 # set all leases to be expirable
3546 new_expiration_time = now - 3000 + 31*24*60*60
3548 def count_shares(si):
3549 return len(list(ss._iter_share_files(si)))
3550 def _get_sharefile(si):
3551 return list(ss._iter_share_files(si))[0]
3552 def count_leases(si):
3553 return len(list(_get_sharefile(si).get_leases()))
3555 sf0 = _get_sharefile(immutable_si_0)
3556 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3557 sf1 = _get_sharefile(immutable_si_1)
3558 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3559 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3560 sf2 = _get_sharefile(mutable_si_2)
3561 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3562 sf3 = _get_sharefile(mutable_si_3)
3563 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3564 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3566 ss.setServiceParent(self.s)
3568 return bool(lc.get_state()["last-cycle-finished"] is not None)
3569 d = self.poll(_wait)
3571 def _after_first_cycle(ignored):
3572 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3573 self.failUnlessEqual(count_shares(immutable_si_1), 0)
3574 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3575 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3576 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3577 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3578 d.addCallback(_after_first_cycle)
3579 d.addCallback(lambda ign: self.render1(webstatus))
3580 def _check_html(html):
3581 s = remove_tags(html)
3582 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3583 d.addCallback(_check_html)
3586 def test_only_mutable(self):
3587 basedir = "storage/LeaseCrawler/only_mutable"
3588 fileutil.make_dirs(basedir)
3590 then = int(now - 2000)
3591 ss = StorageServer(basedir, "\x00" * 20,
3592 expiration_enabled=True,
3593 expiration_mode="cutoff-date",
3594 expiration_cutoff_date=then,
3595 expiration_sharetypes=("mutable",))
3596 lc = ss.lease_checker
3598 webstatus = StorageStatus(ss)
3600 self.make_shares(ss)
3601 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3602 # set all leases to be expirable
3603 new_expiration_time = now - 3000 + 31*24*60*60
3605 def count_shares(si):
3606 return len(list(ss._iter_share_files(si)))
3607 def _get_sharefile(si):
3608 return list(ss._iter_share_files(si))[0]
3609 def count_leases(si):
3610 return len(list(_get_sharefile(si).get_leases()))
3612 sf0 = _get_sharefile(immutable_si_0)
3613 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3614 sf1 = _get_sharefile(immutable_si_1)
3615 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3616 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3617 sf2 = _get_sharefile(mutable_si_2)
3618 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3619 sf3 = _get_sharefile(mutable_si_3)
3620 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3621 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3623 ss.setServiceParent(self.s)
3625 return bool(lc.get_state()["last-cycle-finished"] is not None)
3626 d = self.poll(_wait)
3628 def _after_first_cycle(ignored):
3629 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3630 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3631 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3632 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3633 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3634 self.failUnlessEqual(count_shares(mutable_si_3), 0)
3635 d.addCallback(_after_first_cycle)
3636 d.addCallback(lambda ign: self.render1(webstatus))
3637 def _check_html(html):
3638 s = remove_tags(html)
3639 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3640 d.addCallback(_check_html)
3643 def test_bad_mode(self):
3644 basedir = "storage/LeaseCrawler/bad_mode"
3645 fileutil.make_dirs(basedir)
3646 e = self.failUnlessRaises(ValueError,
3647 StorageServer, basedir, "\x00" * 20,
3648 expiration_mode="bogus")
3649 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3651 def test_parse_duration(self):
3655 p = time_format.parse_duration
3656 self.failUnlessEqual(p("7days"), 7*DAY)
3657 self.failUnlessEqual(p("31day"), 31*DAY)
3658 self.failUnlessEqual(p("60 days"), 60*DAY)
3659 self.failUnlessEqual(p("2mo"), 2*MONTH)
3660 self.failUnlessEqual(p("3 month"), 3*MONTH)
3661 self.failUnlessEqual(p("2years"), 2*YEAR)
3662 e = self.failUnlessRaises(ValueError, p, "2kumquats")
3663 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3665 def test_parse_date(self):
3666 p = time_format.parse_date
3667 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3668 self.failUnlessEqual(p("2009-03-18"), 1237334400)
3670 def test_limited_history(self):
3671 basedir = "storage/LeaseCrawler/limited_history"
3672 fileutil.make_dirs(basedir)
3673 ss = StorageServer(basedir, "\x00" * 20)
3674 # make it start sooner than usual.
3675 lc = ss.lease_checker
3679 # create a few shares, with some leases on them
3680 self.make_shares(ss)
3682 ss.setServiceParent(self.s)
3684 def _wait_until_15_cycles_done():
3685 last = lc.state["last-cycle-finished"]
3686 if last is not None and last >= 15:
3691 d = self.poll(_wait_until_15_cycles_done)
3693 def _check(ignored):
3696 self.failUnlessEqual(len(h), 10)
3697 self.failUnlessEqual(max(h.keys()), 15)
3698 self.failUnlessEqual(min(h.keys()), 6)
3699 d.addCallback(_check)
3702 def test_unpredictable_future(self):
3703 basedir = "storage/LeaseCrawler/unpredictable_future"
3704 fileutil.make_dirs(basedir)
3705 ss = StorageServer(basedir, "\x00" * 20)
3706 # make it start sooner than usual.
3707 lc = ss.lease_checker
3709 lc.cpu_slice = -1.0 # stop quickly
3711 self.make_shares(ss)
3713 ss.setServiceParent(self.s)
3715 d = fireEventually()
3716 def _check(ignored):
3717 # this should fire after the first bucket is complete, but before
3718 # the first prefix is complete, so the progress-measurer won't
3719 # think we've gotten far enough to raise our percent-complete
3720 # above 0%, triggering the cannot-predict-the-future code in
3721 # expirer.py . This will have to change if/when the
3722 # progress-measurer gets smart enough to count buckets (we'll
3723 # have to interrupt it even earlier, before it's finished the
3726 if "cycle-to-date" not in s:
3727 d2 = fireEventually()
3728 d2.addCallback(_check)
3730 self.failUnlessIn("cycle-to-date", s)
3731 self.failUnlessIn("estimated-remaining-cycle", s)
3732 self.failUnlessIn("estimated-current-cycle", s)
3734 left = s["estimated-remaining-cycle"]["space-recovered"]
3735 self.failUnlessEqual(left["actual-buckets"], None)
3736 self.failUnlessEqual(left["original-buckets"], None)
3737 self.failUnlessEqual(left["configured-buckets"], None)
3738 self.failUnlessEqual(left["actual-shares"], None)
3739 self.failUnlessEqual(left["original-shares"], None)
3740 self.failUnlessEqual(left["configured-shares"], None)
3741 self.failUnlessEqual(left["actual-diskbytes"], None)
3742 self.failUnlessEqual(left["original-diskbytes"], None)
3743 self.failUnlessEqual(left["configured-diskbytes"], None)
3744 self.failUnlessEqual(left["actual-sharebytes"], None)
3745 self.failUnlessEqual(left["original-sharebytes"], None)
3746 self.failUnlessEqual(left["configured-sharebytes"], None)
3748 full = s["estimated-remaining-cycle"]["space-recovered"]
3749 self.failUnlessEqual(full["actual-buckets"], None)
3750 self.failUnlessEqual(full["original-buckets"], None)
3751 self.failUnlessEqual(full["configured-buckets"], None)
3752 self.failUnlessEqual(full["actual-shares"], None)
3753 self.failUnlessEqual(full["original-shares"], None)
3754 self.failUnlessEqual(full["configured-shares"], None)
3755 self.failUnlessEqual(full["actual-diskbytes"], None)
3756 self.failUnlessEqual(full["original-diskbytes"], None)
3757 self.failUnlessEqual(full["configured-diskbytes"], None)
3758 self.failUnlessEqual(full["actual-sharebytes"], None)
3759 self.failUnlessEqual(full["original-sharebytes"], None)
3760 self.failUnlessEqual(full["configured-sharebytes"], None)
3762 d.addCallback(_check)
3765 def test_no_st_blocks(self):
3766 basedir = "storage/LeaseCrawler/no_st_blocks"
3767 fileutil.make_dirs(basedir)
3768 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3769 expiration_mode="age",
3770 expiration_override_lease_duration=-1000)
3771 # a negative expiration_time= means the "configured-"
3772 # space-recovered counts will be non-zero, since all shares will have
3775 # make it start sooner than usual.
3776 lc = ss.lease_checker
3779 self.make_shares(ss)
3780 ss.setServiceParent(self.s)
3782 return bool(lc.get_state()["last-cycle-finished"] is not None)
3783 d = self.poll(_wait)
3785 def _check(ignored):
3787 last = s["history"][0]
3788 rec = last["space-recovered"]
3789 self.failUnlessEqual(rec["configured-buckets"], 4)
3790 self.failUnlessEqual(rec["configured-shares"], 4)
3791 self.failUnless(rec["configured-sharebytes"] > 0,
3792 rec["configured-sharebytes"])
3793 # without the .st_blocks field in os.stat() results, we should be
3794 # reporting diskbytes==sharebytes
3795 self.failUnlessEqual(rec["configured-sharebytes"],
3796 rec["configured-diskbytes"])
3797 d.addCallback(_check)
3800 def test_share_corruption(self):
3801 self._poll_should_ignore_these_errors = [
3802 UnknownMutableContainerVersionError,
3803 UnknownImmutableContainerVersionError,
3805 basedir = "storage/LeaseCrawler/share_corruption"
3806 fileutil.make_dirs(basedir)
3807 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3808 w = StorageStatus(ss)
3809 # make it start sooner than usual.
3810 lc = ss.lease_checker
3811 lc.stop_after_first_bucket = True
3815 # create a few shares, with some leases on them
3816 self.make_shares(ss)
3818 # now corrupt one, and make sure the lease-checker keeps going
3819 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3820 first = min(self.sis)
3821 first_b32 = base32.b2a(first)
3822 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3825 f.write("BAD MAGIC")
3827 # if get_share_file() doesn't see the correct mutable magic, it
3828 # assumes the file is an immutable share, and then
3829 # immutable.ShareFile sees a bad version. So regardless of which kind
3830 # of share we corrupted, this will trigger an
3831 # UnknownImmutableContainerVersionError.
3833 # also create an empty bucket
3834 empty_si = base32.b2a("\x04"*16)
3835 empty_bucket_dir = os.path.join(ss.sharedir,
3836 storage_index_to_dir(empty_si))
3837 fileutil.make_dirs(empty_bucket_dir)
3839 ss.setServiceParent(self.s)
3841 d = fireEventually()
3843 # now examine the state right after the first bucket has been
3845 def _after_first_bucket(ignored):
3847 if "cycle-to-date" not in s:
3848 d2 = fireEventually()
3849 d2.addCallback(_after_first_bucket)
3851 so_far = s["cycle-to-date"]
3852 rec = so_far["space-recovered"]
3853 self.failUnlessEqual(rec["examined-buckets"], 1)
3854 self.failUnlessEqual(rec["examined-shares"], 0)
3855 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3856 d.addCallback(_after_first_bucket)
3858 d.addCallback(lambda ign: self.render_json(w))
3859 def _check_json(json):
3860 data = simplejson.loads(json)
3861 # grr. json turns all dict keys into strings.
3862 so_far = data["lease-checker"]["cycle-to-date"]
3863 corrupt_shares = so_far["corrupt-shares"]
3864 # it also turns all tuples into lists
3865 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3866 d.addCallback(_check_json)
3867 d.addCallback(lambda ign: self.render1(w))
3868 def _check_html(html):
3869 s = remove_tags(html)
3870 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3871 d.addCallback(_check_html)
3874 return bool(lc.get_state()["last-cycle-finished"] is not None)
3875 d.addCallback(lambda ign: self.poll(_wait))
3877 def _after_first_cycle(ignored):
3879 last = s["history"][0]
3880 rec = last["space-recovered"]
3881 self.failUnlessEqual(rec["examined-buckets"], 5)
3882 self.failUnlessEqual(rec["examined-shares"], 3)
3883 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3884 d.addCallback(_after_first_cycle)
3885 d.addCallback(lambda ign: self.render_json(w))
3886 def _check_json_history(json):
3887 data = simplejson.loads(json)
3888 last = data["lease-checker"]["history"]["0"]
3889 corrupt_shares = last["corrupt-shares"]
3890 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3891 d.addCallback(_check_json_history)
3892 d.addCallback(lambda ign: self.render1(w))
3893 def _check_html_history(html):
3894 s = remove_tags(html)
3895 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3896 d.addCallback(_check_html_history)
3899 self.flushLoggedErrors(UnknownMutableContainerVersionError,
3900 UnknownImmutableContainerVersionError)
3905 def render_json(self, page):
3906 d = self.render1(page, args={"t": ["json"]})
3909 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3912 self.s = service.MultiService()
3913 self.s.startService()
3915 return self.s.stopService()
3917 def test_no_server(self):
3918 w = StorageStatus(None)
3919 html = w.renderSynchronously()
3920 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3922 def test_status(self):
3923 basedir = "storage/WebStatus/status"
3924 fileutil.make_dirs(basedir)
3925 ss = StorageServer(basedir, "\x00" * 20)
3926 ss.setServiceParent(self.s)
3927 w = StorageStatus(ss)
3929 def _check_html(html):
3930 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3931 s = remove_tags(html)
3932 self.failUnlessIn("Accepting new shares: Yes", s)
3933 self.failUnlessIn("Reserved space: - 0 B (0)", s)
3934 d.addCallback(_check_html)
3935 d.addCallback(lambda ign: self.render_json(w))
3936 def _check_json(json):
3937 data = simplejson.loads(json)
3939 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3940 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3941 self.failUnlessIn("bucket-counter", data)
3942 self.failUnlessIn("lease-checker", data)
3943 d.addCallback(_check_json)
3946 def render_json(self, page):
3947 d = self.render1(page, args={"t": ["json"]})
3950 @mock.patch('allmydata.util.fileutil.get_disk_stats')
3951 def test_status_no_disk_stats(self, mock_get_disk_stats):
3952 mock_get_disk_stats.side_effect = AttributeError()
3954 # Some platforms may have no disk stats API. Make sure the code can handle that
3955 # (test runs on all platforms).
3956 basedir = "storage/WebStatus/status_no_disk_stats"
3957 fileutil.make_dirs(basedir)
3958 ss = StorageServer(basedir, "\x00" * 20)
3959 ss.setServiceParent(self.s)
3960 w = StorageStatus(ss)
3961 html = w.renderSynchronously()
3962 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3963 s = remove_tags(html)
3964 self.failUnlessIn("Accepting new shares: Yes", s)
3965 self.failUnlessIn("Total disk space: ?", s)
3966 self.failUnlessIn("Space Available to Tahoe: ?", s)
3967 self.failUnless(ss.get_available_space() is None)
3969 @mock.patch('allmydata.util.fileutil.get_disk_stats')
3970 def test_status_bad_disk_stats(self, mock_get_disk_stats):
3971 mock_get_disk_stats.side_effect = OSError()
3973 # If the API to get disk stats exists but a call to it fails, then the status should
3974 # show that no shares will be accepted, and get_available_space() should be 0.
3975 basedir = "storage/WebStatus/status_bad_disk_stats"
3976 fileutil.make_dirs(basedir)
3977 ss = StorageServer(basedir, "\x00" * 20)
3978 ss.setServiceParent(self.s)
3979 w = StorageStatus(ss)
3980 html = w.renderSynchronously()
3981 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3982 s = remove_tags(html)
3983 self.failUnlessIn("Accepting new shares: No", s)
3984 self.failUnlessIn("Total disk space: ?", s)
3985 self.failUnlessIn("Space Available to Tahoe: ?", s)
3986 self.failUnlessEqual(ss.get_available_space(), 0)
3988 @mock.patch('allmydata.util.fileutil.get_disk_stats')
3989 def test_status_right_disk_stats(self, mock_get_disk_stats):
3992 free_for_root = 4*GB
3993 free_for_nonroot = 3*GB
3994 reserved_space = 1*GB
3995 used = total - free_for_root
3996 avail = max(free_for_nonroot - reserved_space, 0)
3997 mock_get_disk_stats.return_value = {
3999 'free_for_root': free_for_root,
4000 'free_for_nonroot': free_for_nonroot,
4005 basedir = "storage/WebStatus/status_right_disk_stats"
4006 fileutil.make_dirs(basedir)
4007 ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4008 expecteddir = ss.sharedir
4009 ss.setServiceParent(self.s)
4010 w = StorageStatus(ss)
4011 html = w.renderSynchronously()
4013 self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4014 mock_get_disk_stats.call_args_list)
4016 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4017 s = remove_tags(html)
4018 self.failUnlessIn("Total disk space: 5.00 GB", s)
4019 self.failUnlessIn("Disk space used: - 1.00 GB", s)
4020 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4021 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4022 self.failUnlessIn("Reserved space: - 1.00 GB", s)
4023 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4024 self.failUnlessEqual(ss.get_available_space(), 2*GB)
4026 def test_readonly(self):
4027 basedir = "storage/WebStatus/readonly"
4028 fileutil.make_dirs(basedir)
4029 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4030 ss.setServiceParent(self.s)
4031 w = StorageStatus(ss)
4032 html = w.renderSynchronously()
4033 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4034 s = remove_tags(html)
4035 self.failUnlessIn("Accepting new shares: No", s)
4037 def test_reserved(self):
4038 basedir = "storage/WebStatus/reserved"
4039 fileutil.make_dirs(basedir)
4040 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4041 ss.setServiceParent(self.s)
4042 w = StorageStatus(ss)
4043 html = w.renderSynchronously()
4044 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4045 s = remove_tags(html)
4046 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4048 def test_huge_reserved(self):
4049 basedir = "storage/WebStatus/reserved"
4050 fileutil.make_dirs(basedir)
4051 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4052 ss.setServiceParent(self.s)
4053 w = StorageStatus(ss)
4054 html = w.renderSynchronously()
4055 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4056 s = remove_tags(html)
4057 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4059 def test_util(self):
4060 w = StorageStatus(None)
4061 self.failUnlessEqual(w.render_space(None, None), "?")
4062 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4063 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4064 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4065 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4066 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)