1 import time, os.path, platform, stat, re, simplejson, struct, shutil
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24 LayoutInvalid, MDMFSIGNABLEHEADER, \
25 SIGNED_PREFIX, MDMFHEADER, \
26 MDMFOFFSETS, SDMFSlotWriteProxy, \
29 VERIFICATION_KEY_SIZE, \
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
40 def __init__(self, ignore_disconnectors=False):
41 self.ignore = ignore_disconnectors
42 self.disconnectors = {}
43 def notifyOnDisconnect(self, f, *args, **kwargs):
47 self.disconnectors[m] = (f, args, kwargs)
49 def dontNotifyOnDisconnect(self, marker):
52 del self.disconnectors[marker]
54 class FakeStatsProvider:
55 def count(self, name, delta=1):
57 def register_producer(self, producer):
60 class Bucket(unittest.TestCase):
61 def make_workdir(self, name):
62 basedir = os.path.join("storage", "Bucket", name)
63 incoming = os.path.join(basedir, "tmp", "bucket")
64 final = os.path.join(basedir, "bucket")
65 fileutil.make_dirs(basedir)
66 fileutil.make_dirs(os.path.join(basedir, "tmp"))
67 return incoming, final
69 def bucket_writer_closed(self, bw, consumed):
71 def add_latency(self, category, latency):
73 def count(self, name, delta=1):
78 renew_secret = os.urandom(32)
79 cancel_secret = os.urandom(32)
80 expiration_time = time.time() + 5000
81 return LeaseInfo(owner_num, renew_secret, cancel_secret,
82 expiration_time, "\x00" * 20)
84 def test_create(self):
85 incoming, final = self.make_workdir("test_create")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*25)
91 bw.remote_write(75, "d"*7)
94 def test_readwrite(self):
95 incoming, final = self.make_workdir("test_readwrite")
96 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
98 bw.remote_write(0, "a"*25)
99 bw.remote_write(25, "b"*25)
100 bw.remote_write(50, "c"*7) # last block may be short
104 br = BucketReader(self, bw.finalhome)
105 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
115 def callRemote(self, methname, *args, **kwargs):
117 meth = getattr(self.target, "remote_" + methname)
118 return meth(*args, **kwargs)
120 if methname == "slot_readv":
122 if "writev" in methname:
123 self.write_count += 1
125 return defer.maybeDeferred(_call)
128 class BucketProxy(unittest.TestCase):
129 def make_bucket(self, name, size):
130 basedir = os.path.join("storage", "BucketProxy", name)
131 incoming = os.path.join(basedir, "tmp", "bucket")
132 final = os.path.join(basedir, "bucket")
133 fileutil.make_dirs(basedir)
134 fileutil.make_dirs(os.path.join(basedir, "tmp"))
135 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
141 def make_lease(self):
143 renew_secret = os.urandom(32)
144 cancel_secret = os.urandom(32)
145 expiration_time = time.time() + 5000
146 return LeaseInfo(owner_num, renew_secret, cancel_secret,
147 expiration_time, "\x00" * 20)
149 def bucket_writer_closed(self, bw, consumed):
151 def add_latency(self, category, latency):
153 def count(self, name, delta=1):
156 def test_create(self):
157 bw, rb, sharefname = self.make_bucket("test_create", 500)
158 bp = WriteBucketProxy(rb, None,
163 uri_extension_size_max=500)
164 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
166 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
167 # Let's pretend each share has 100 bytes of data, and that there are
168 # 4 segments (25 bytes each), and 8 shares total. So the two
169 # per-segment merkle trees (crypttext_hash_tree,
170 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
171 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
172 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
173 # long. That should make the whole share:
175 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
176 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
178 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
180 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
182 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
184 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
186 uri_extension = "s" + "E"*498 + "e"
188 bw, rb, sharefname = self.make_bucket(name, sharesize)
189 bp = wbp_class(rb, None,
194 uri_extension_size_max=len(uri_extension))
197 d.addCallback(lambda res: bp.put_block(0, "a"*25))
198 d.addCallback(lambda res: bp.put_block(1, "b"*25))
199 d.addCallback(lambda res: bp.put_block(2, "c"*25))
200 d.addCallback(lambda res: bp.put_block(3, "d"*20))
201 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
202 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
203 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
204 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
205 d.addCallback(lambda res: bp.close())
207 # now read everything back
208 def _start_reading(res):
209 br = BucketReader(self, sharefname)
212 server = NoNetworkServer("abc", None)
213 rbp = rbp_class(rb, server, storage_index="")
214 self.failUnlessIn("to peer", repr(rbp))
215 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
217 d1 = rbp.get_block_data(0, 25, 25)
218 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
219 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
220 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
221 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
222 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
223 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
224 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
226 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
227 d1.addCallback(lambda res:
228 self.failUnlessEqual(res, crypttext_hashes))
229 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
230 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
231 d1.addCallback(lambda res: rbp.get_share_hashes())
232 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
233 d1.addCallback(lambda res: rbp.get_uri_extension())
234 d1.addCallback(lambda res:
235 self.failUnlessEqual(res, uri_extension))
239 d.addCallback(_start_reading)
243 def test_readwrite_v1(self):
244 return self._do_test_readwrite("test_readwrite_v1",
245 0x24, WriteBucketProxy, ReadBucketProxy)
247 def test_readwrite_v2(self):
248 return self._do_test_readwrite("test_readwrite_v2",
249 0x44, WriteBucketProxy_v2, ReadBucketProxy)
251 class Server(unittest.TestCase):
254 self.sparent = LoggingServiceParent()
255 self.sparent.startService()
256 self._lease_secret = itertools.count()
258 return self.sparent.stopService()
260 def workdir(self, name):
261 basedir = os.path.join("storage", "Server", name)
264 def create(self, name, reserved_space=0, klass=StorageServer):
265 workdir = self.workdir(name)
266 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
267 stats_provider=FakeStatsProvider())
268 ss.setServiceParent(self.sparent)
271 def test_create(self):
272 self.create("test_create")
274 def allocate(self, ss, storage_index, sharenums, size, canary=None):
275 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
276 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
278 canary = FakeCanary()
279 return ss.remote_allocate_buckets(storage_index,
280 renew_secret, cancel_secret,
281 sharenums, size, canary)
283 def test_large_share(self):
284 syslow = platform.system().lower()
285 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
286 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
288 avail = fileutil.get_available_space('.', 512*2**20)
290 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
292 ss = self.create("test_large_share")
294 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
295 self.failUnlessEqual(already, set())
296 self.failUnlessEqual(set(writers.keys()), set([0]))
298 shnum, bucket = writers.items()[0]
299 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
300 bucket.remote_write(2**32, "ab")
301 bucket.remote_close()
303 readers = ss.remote_get_buckets("allocate")
304 reader = readers[shnum]
305 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
307 def test_dont_overfill_dirs(self):
309 This test asserts that if you add a second share whose storage index
310 share lots of leading bits with an extant share (but isn't the exact
311 same storage index), this won't add an entry to the share directory.
313 ss = self.create("test_dont_overfill_dirs")
314 already, writers = self.allocate(ss, "storageindex", [0], 10)
315 for i, wb in writers.items():
316 wb.remote_write(0, "%10d" % i)
318 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
320 children_of_storedir = set(os.listdir(storedir))
322 # Now store another one under another storageindex that has leading
323 # chars the same as the first storageindex.
324 already, writers = self.allocate(ss, "storageindey", [0], 10)
325 for i, wb in writers.items():
326 wb.remote_write(0, "%10d" % i)
328 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
330 new_children_of_storedir = set(os.listdir(storedir))
331 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
333 def test_remove_incoming(self):
334 ss = self.create("test_remove_incoming")
335 already, writers = self.allocate(ss, "vid", range(3), 10)
336 for i,wb in writers.items():
337 wb.remote_write(0, "%10d" % i)
339 incoming_share_dir = wb.incominghome
340 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
341 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
342 incoming_dir = os.path.dirname(incoming_prefix_dir)
343 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
344 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
345 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
347 def test_abort(self):
348 # remote_abort, when called on a writer, should make sure that
349 # the allocated size of the bucket is not counted by the storage
350 # server when accounting for space.
351 ss = self.create("test_abort")
352 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
353 self.failIfEqual(ss.allocated_size(), 0)
355 # Now abort the writers.
356 for writer in writers.itervalues():
357 writer.remote_abort()
358 self.failUnlessEqual(ss.allocated_size(), 0)
361 def test_allocate(self):
362 ss = self.create("test_allocate")
364 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
366 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
367 self.failUnlessEqual(already, set())
368 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
370 # while the buckets are open, they should not count as readable
371 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
374 for i,wb in writers.items():
375 wb.remote_write(0, "%25d" % i)
377 # aborting a bucket that was already closed is a no-op
380 # now they should be readable
381 b = ss.remote_get_buckets("allocate")
382 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
383 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
385 self.failUnlessIn("BucketReader", b_str)
386 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
388 # now if we ask about writing again, the server should offer those
389 # three buckets as already present. It should offer them even if we
390 # don't ask about those specific ones.
391 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
392 self.failUnlessEqual(already, set([0,1,2]))
393 self.failUnlessEqual(set(writers.keys()), set([3,4]))
395 # while those two buckets are open for writing, the server should
396 # refuse to offer them to uploaders
398 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
399 self.failUnlessEqual(already2, set([0,1,2]))
400 self.failUnlessEqual(set(writers2.keys()), set([5]))
402 # aborting the writes should remove the tempfiles
403 for i,wb in writers2.items():
405 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
406 self.failUnlessEqual(already2, set([0,1,2]))
407 self.failUnlessEqual(set(writers2.keys()), set([5]))
409 for i,wb in writers2.items():
411 for i,wb in writers.items():
414 def test_bad_container_version(self):
415 ss = self.create("test_bad_container_version")
416 a,w = self.allocate(ss, "si1", [0], 10)
417 w[0].remote_write(0, "\xff"*10)
420 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
423 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
426 ss.remote_get_buckets("allocate")
428 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
429 ss.remote_get_buckets, "si1")
430 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
432 def test_disconnect(self):
433 # simulate a disconnection
434 ss = self.create("test_disconnect")
435 canary = FakeCanary()
436 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
437 self.failUnlessEqual(already, set())
438 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
439 for (f,args,kwargs) in canary.disconnectors.values():
444 # that ought to delete the incoming shares
445 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
446 self.failUnlessEqual(already, set())
447 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
449 @mock.patch('allmydata.util.fileutil.get_disk_stats')
450 def test_reserved_space(self, mock_get_disk_stats):
452 mock_get_disk_stats.return_value = {
453 'free_for_nonroot': 15000,
454 'avail': max(15000 - reserved_space, 0),
457 ss = self.create("test_reserved_space", reserved_space=reserved_space)
458 # 15k available, 10k reserved, leaves 5k for shares
460 # a newly created and filled share incurs this much overhead, beyond
461 # the size we request.
463 LEASE_SIZE = 4+32+32+4
464 canary = FakeCanary(True)
465 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
466 self.failUnlessEqual(len(writers), 3)
467 # now the StorageServer should have 3000 bytes provisionally
468 # allocated, allowing only 2000 more to be claimed
469 self.failUnlessEqual(len(ss._active_writers), 3)
471 # allocating 1001-byte shares only leaves room for one
472 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
473 self.failUnlessEqual(len(writers2), 1)
474 self.failUnlessEqual(len(ss._active_writers), 4)
476 # we abandon the first set, so their provisional allocation should be
480 self.failUnlessEqual(len(ss._active_writers), 1)
481 # now we have a provisional allocation of 1001 bytes
483 # and we close the second set, so their provisional allocation should
484 # become real, long-term allocation, and grows to include the
486 for bw in writers2.values():
487 bw.remote_write(0, "a"*25)
492 self.failUnlessEqual(len(ss._active_writers), 0)
494 allocated = 1001 + OVERHEAD + LEASE_SIZE
496 # we have to manually increase available, since we're not doing real
498 mock_get_disk_stats.return_value = {
499 'free_for_nonroot': 15000 - allocated,
500 'avail': max(15000 - allocated - reserved_space, 0),
503 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
504 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
505 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
506 self.failUnlessEqual(len(writers3), 39)
507 self.failUnlessEqual(len(ss._active_writers), 39)
511 self.failUnlessEqual(len(ss._active_writers), 0)
512 ss.disownServiceParent()
516 basedir = self.workdir("test_seek_behavior")
517 fileutil.make_dirs(basedir)
518 filename = os.path.join(basedir, "testfile")
519 f = open(filename, "wb")
522 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
523 # files. mode="a" preserves previous contents but does not allow
524 # seeking-to-create-holes. mode="r+" allows both.
525 f = open(filename, "rb+")
529 filelen = os.stat(filename)[stat.ST_SIZE]
530 self.failUnlessEqual(filelen, 100+3)
531 f2 = open(filename, "rb")
532 self.failUnlessEqual(f2.read(5), "start")
535 def test_leases(self):
536 ss = self.create("test_leases")
537 canary = FakeCanary()
541 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
542 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
543 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
544 sharenums, size, canary)
545 self.failUnlessEqual(len(already), 0)
546 self.failUnlessEqual(len(writers), 5)
547 for wb in writers.values():
550 leases = list(ss.get_leases("si0"))
551 self.failUnlessEqual(len(leases), 1)
552 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
554 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
555 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
556 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
557 sharenums, size, canary)
558 for wb in writers.values():
561 # take out a second lease on si1
562 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
563 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
564 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
565 sharenums, size, canary)
566 self.failUnlessEqual(len(already), 5)
567 self.failUnlessEqual(len(writers), 0)
569 leases = list(ss.get_leases("si1"))
570 self.failUnlessEqual(len(leases), 2)
571 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
573 # and a third lease, using add-lease
574 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
575 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
576 ss.remote_add_lease("si1", rs2a, cs2a)
577 leases = list(ss.get_leases("si1"))
578 self.failUnlessEqual(len(leases), 3)
579 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
581 # add-lease on a missing storage index is silently ignored
582 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
584 # check that si0 is readable
585 readers = ss.remote_get_buckets("si0")
586 self.failUnlessEqual(len(readers), 5)
588 # renew the first lease. Only the proper renew_secret should work
589 ss.remote_renew_lease("si0", rs0)
590 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
591 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
593 # check that si0 is still readable
594 readers = ss.remote_get_buckets("si0")
595 self.failUnlessEqual(len(readers), 5)
598 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
599 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
600 ss.remote_cancel_lease("si0", cs0)
602 # si0 should now be gone
603 readers = ss.remote_get_buckets("si0")
604 self.failUnlessEqual(len(readers), 0)
605 # and the renew should no longer work
606 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
609 # cancel the first lease on si1, leaving the second and third in place
610 ss.remote_cancel_lease("si1", cs1)
611 readers = ss.remote_get_buckets("si1")
612 self.failUnlessEqual(len(readers), 5)
613 # the corresponding renew should no longer work
614 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
616 leases = list(ss.get_leases("si1"))
617 self.failUnlessEqual(len(leases), 2)
618 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
620 ss.remote_renew_lease("si1", rs2)
621 # cancelling the second and third should make it go away
622 ss.remote_cancel_lease("si1", cs2)
623 ss.remote_cancel_lease("si1", cs2a)
624 readers = ss.remote_get_buckets("si1")
625 self.failUnlessEqual(len(readers), 0)
626 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
627 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
628 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
630 leases = list(ss.get_leases("si1"))
631 self.failUnlessEqual(len(leases), 0)
634 # test overlapping uploads
635 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
636 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
637 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
638 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
639 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
640 sharenums, size, canary)
641 self.failUnlessEqual(len(already), 0)
642 self.failUnlessEqual(len(writers), 5)
643 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
644 sharenums, size, canary)
645 self.failUnlessEqual(len(already2), 0)
646 self.failUnlessEqual(len(writers2), 0)
647 for wb in writers.values():
650 leases = list(ss.get_leases("si3"))
651 self.failUnlessEqual(len(leases), 1)
653 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
654 sharenums, size, canary)
655 self.failUnlessEqual(len(already3), 5)
656 self.failUnlessEqual(len(writers3), 0)
658 leases = list(ss.get_leases("si3"))
659 self.failUnlessEqual(len(leases), 2)
661 def test_readonly(self):
662 workdir = self.workdir("test_readonly")
663 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
664 ss.setServiceParent(self.sparent)
666 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
667 self.failUnlessEqual(already, set())
668 self.failUnlessEqual(writers, {})
670 stats = ss.get_stats()
671 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
672 if "storage_server.disk_avail" in stats:
673 # Some platforms may not have an API to get disk stats.
674 # But if there are stats, readonly_storage means disk_avail=0
675 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
677 def test_discard(self):
678 # discard is really only used for other tests, but we test it anyways
679 workdir = self.workdir("test_discard")
680 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
681 ss.setServiceParent(self.sparent)
683 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
684 self.failUnlessEqual(already, set())
685 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
686 for i,wb in writers.items():
687 wb.remote_write(0, "%25d" % i)
689 # since we discard the data, the shares should be present but sparse.
690 # Since we write with some seeks, the data we read back will be all
692 b = ss.remote_get_buckets("vid")
693 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
694 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
696 def test_advise_corruption(self):
697 workdir = self.workdir("test_advise_corruption")
698 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
699 ss.setServiceParent(self.sparent)
701 si0_s = base32.b2a("si0")
702 ss.remote_advise_corrupt_share("immutable", "si0", 0,
703 "This share smells funny.\n")
704 reportdir = os.path.join(workdir, "corruption-advisories")
705 reports = os.listdir(reportdir)
706 self.failUnlessEqual(len(reports), 1)
707 report_si0 = reports[0]
708 self.failUnlessIn(si0_s, report_si0)
709 f = open(os.path.join(reportdir, report_si0), "r")
712 self.failUnlessIn("type: immutable", report)
713 self.failUnlessIn("storage_index: %s" % si0_s, report)
714 self.failUnlessIn("share_number: 0", report)
715 self.failUnlessIn("This share smells funny.", report)
717 # test the RIBucketWriter version too
718 si1_s = base32.b2a("si1")
719 already,writers = self.allocate(ss, "si1", [1], 75)
720 self.failUnlessEqual(already, set())
721 self.failUnlessEqual(set(writers.keys()), set([1]))
722 writers[1].remote_write(0, "data")
723 writers[1].remote_close()
725 b = ss.remote_get_buckets("si1")
726 self.failUnlessEqual(set(b.keys()), set([1]))
727 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
729 reports = os.listdir(reportdir)
730 self.failUnlessEqual(len(reports), 2)
731 report_si1 = [r for r in reports if si1_s in r][0]
732 f = open(os.path.join(reportdir, report_si1), "r")
735 self.failUnlessIn("type: immutable", report)
736 self.failUnlessIn("storage_index: %s" % si1_s, report)
737 self.failUnlessIn("share_number: 1", report)
738 self.failUnlessIn("This share tastes like dust.", report)
742 class MutableServer(unittest.TestCase):
745 self.sparent = LoggingServiceParent()
746 self._lease_secret = itertools.count()
748 return self.sparent.stopService()
750 def workdir(self, name):
751 basedir = os.path.join("storage", "MutableServer", name)
754 def create(self, name):
755 workdir = self.workdir(name)
756 ss = StorageServer(workdir, "\x00" * 20)
757 ss.setServiceParent(self.sparent)
760 def test_create(self):
761 self.create("test_create")
763 def write_enabler(self, we_tag):
764 return hashutil.tagged_hash("we_blah", we_tag)
766 def renew_secret(self, tag):
767 return hashutil.tagged_hash("renew_blah", str(tag))
769 def cancel_secret(self, tag):
770 return hashutil.tagged_hash("cancel_blah", str(tag))
772 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
773 write_enabler = self.write_enabler(we_tag)
774 renew_secret = self.renew_secret(lease_tag)
775 cancel_secret = self.cancel_secret(lease_tag)
776 rstaraw = ss.remote_slot_testv_and_readv_and_writev
777 testandwritev = dict( [ (shnum, ([], [], None) )
778 for shnum in sharenums ] )
780 rc = rstaraw(storage_index,
781 (write_enabler, renew_secret, cancel_secret),
784 (did_write, readv_data) = rc
785 self.failUnless(did_write)
786 self.failUnless(isinstance(readv_data, dict))
787 self.failUnlessEqual(len(readv_data), 0)
789 def test_bad_magic(self):
790 ss = self.create("test_bad_magic")
791 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
792 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
797 read = ss.remote_slot_readv
798 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
799 read, "si1", [0], [(0,10)])
800 self.failUnlessIn(" had magic ", str(e))
801 self.failUnlessIn(" but we wanted ", str(e))
803 def test_container_size(self):
804 ss = self.create("test_container_size")
805 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
807 read = ss.remote_slot_readv
808 rstaraw = ss.remote_slot_testv_and_readv_and_writev
809 secrets = ( self.write_enabler("we1"),
810 self.renew_secret("we1"),
811 self.cancel_secret("we1") )
812 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
813 answer = rstaraw("si1", secrets,
814 {0: ([], [(0,data)], len(data)+12)},
816 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
818 # trying to make the container too large will raise an exception
819 TOOBIG = MutableShareFile.MAX_SIZE + 10
820 self.failUnlessRaises(DataTooLargeError,
821 rstaraw, "si1", secrets,
822 {0: ([], [(0,data)], TOOBIG)},
825 # it should be possible to make the container smaller, although at
826 # the moment this doesn't actually affect the share, unless the
827 # container size is dropped to zero, in which case the share is
829 answer = rstaraw("si1", secrets,
830 {0: ([], [(0,data)], len(data)+8)},
832 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
834 answer = rstaraw("si1", secrets,
835 {0: ([], [(0,data)], 0)},
837 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
839 read_answer = read("si1", [0], [(0,10)])
840 self.failUnlessEqual(read_answer, {})
842 def test_allocate(self):
843 ss = self.create("test_allocate")
844 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
847 read = ss.remote_slot_readv
848 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
850 self.failUnlessEqual(read("si1", [], [(0, 10)]),
851 {0: [""], 1: [""], 2: [""]})
852 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
856 secrets = ( self.write_enabler("we1"),
857 self.renew_secret("we1"),
858 self.cancel_secret("we1") )
859 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
860 write = ss.remote_slot_testv_and_readv_and_writev
861 answer = write("si1", secrets,
862 {0: ([], [(0,data)], None)},
864 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
866 self.failUnlessEqual(read("si1", [0], [(0,20)]),
867 {0: ["00000000001111111111"]})
868 self.failUnlessEqual(read("si1", [0], [(95,10)]),
870 #self.failUnlessEqual(s0.remote_get_length(), 100)
872 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
873 f = self.failUnlessRaises(BadWriteEnablerError,
874 write, "si1", bad_secrets,
876 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
878 # this testv should fail
879 answer = write("si1", secrets,
880 {0: ([(0, 12, "eq", "444444444444"),
881 (20, 5, "eq", "22222"),
888 self.failUnlessEqual(answer, (False,
889 {0: ["000000000011", "22222"],
893 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
896 answer = write("si1", secrets,
897 {0: ([(10, 5, "lt", "11111"),
904 self.failUnlessEqual(answer, (False,
909 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
912 def test_operators(self):
913 # test operators, the data we're comparing is '11111' in all cases.
914 # test both fail+pass, reset data after each one.
915 ss = self.create("test_operators")
917 secrets = ( self.write_enabler("we1"),
918 self.renew_secret("we1"),
919 self.cancel_secret("we1") )
920 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
921 write = ss.remote_slot_testv_and_readv_and_writev
922 read = ss.remote_slot_readv
925 write("si1", secrets,
926 {0: ([], [(0,data)], None)},
932 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
937 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
938 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
939 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
942 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
947 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
948 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
951 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
956 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
957 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
961 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
966 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
967 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
970 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
975 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
976 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
979 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
984 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
985 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
989 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
994 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
995 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
998 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1003 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1004 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1008 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1013 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1014 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1017 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1022 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1023 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1027 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1032 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1033 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1036 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1041 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1042 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1045 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1050 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1051 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1055 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1060 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1061 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1064 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1069 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1070 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1073 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1078 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1079 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1082 # finally, test some operators against empty shares
1083 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1088 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1089 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1092 def test_readv(self):
1093 ss = self.create("test_readv")
1094 secrets = ( self.write_enabler("we1"),
1095 self.renew_secret("we1"),
1096 self.cancel_secret("we1") )
1097 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1098 write = ss.remote_slot_testv_and_readv_and_writev
1099 read = ss.remote_slot_readv
1100 data = [("%d" % i) * 100 for i in range(3)]
1101 rc = write("si1", secrets,
1102 {0: ([], [(0,data[0])], None),
1103 1: ([], [(0,data[1])], None),
1104 2: ([], [(0,data[2])], None),
1106 self.failUnlessEqual(rc, (True, {}))
1108 answer = read("si1", [], [(0, 10)])
1109 self.failUnlessEqual(answer, {0: ["0"*10],
1113 def compare_leases_without_timestamps(self, leases_a, leases_b):
1114 self.failUnlessEqual(len(leases_a), len(leases_b))
1115 for i in range(len(leases_a)):
1118 self.failUnlessEqual(a.owner_num, b.owner_num)
1119 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1120 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1121 self.failUnlessEqual(a.nodeid, b.nodeid)
1123 def compare_leases(self, leases_a, leases_b):
1124 self.failUnlessEqual(len(leases_a), len(leases_b))
1125 for i in range(len(leases_a)):
1128 self.failUnlessEqual(a.owner_num, b.owner_num)
1129 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1130 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1131 self.failUnlessEqual(a.nodeid, b.nodeid)
1132 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1134 def test_leases(self):
1135 ss = self.create("test_leases")
1137 return ( self.write_enabler("we1"),
1138 self.renew_secret("we1-%d" % n),
1139 self.cancel_secret("we1-%d" % n) )
1140 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1141 write = ss.remote_slot_testv_and_readv_and_writev
1142 read = ss.remote_slot_readv
1143 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1144 self.failUnlessEqual(rc, (True, {}))
1146 # create a random non-numeric file in the bucket directory, to
1147 # exercise the code that's supposed to ignore those.
1148 bucket_dir = os.path.join(self.workdir("test_leases"),
1149 "shares", storage_index_to_dir("si1"))
1150 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1151 f.write("you ought to be ignoring me\n")
1154 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1155 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1157 # add-lease on a missing storage index is silently ignored
1158 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1160 # re-allocate the slots and use the same secrets, that should update
1162 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1163 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1166 ss.remote_renew_lease("si1", secrets(0)[1])
1167 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1169 # now allocate them with a bunch of different secrets, to trigger the
1170 # extended lease code. Use add_lease for one of them.
1171 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1172 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1173 secrets2 = secrets(2)
1174 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1175 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1176 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1177 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1178 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1180 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1182 # cancel one of them
1183 ss.remote_cancel_lease("si1", secrets(5)[2])
1184 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1186 all_leases = list(s0.get_leases())
1187 # and write enough data to expand the container, forcing the server
1188 # to move the leases
1189 write("si1", secrets(0),
1190 {0: ([], [(0,data)], 200), },
1193 # read back the leases, make sure they're still intact.
1194 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1196 ss.remote_renew_lease("si1", secrets(0)[1])
1197 ss.remote_renew_lease("si1", secrets(1)[1])
1198 ss.remote_renew_lease("si1", secrets(2)[1])
1199 ss.remote_renew_lease("si1", secrets(3)[1])
1200 ss.remote_renew_lease("si1", secrets(4)[1])
1201 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1202 # get a new copy of the leases, with the current timestamps. Reading
1203 # data and failing to renew/cancel leases should leave the timestamps
1205 all_leases = list(s0.get_leases())
1206 # renewing with a bogus token should prompt an error message
1208 # examine the exception thus raised, make sure the old nodeid is
1209 # present, to provide for share migration
1210 e = self.failUnlessRaises(IndexError,
1211 ss.remote_renew_lease, "si1",
1214 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1215 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1216 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1218 # same for cancelling
1219 self.failUnlessRaises(IndexError,
1220 ss.remote_cancel_lease, "si1",
1222 self.compare_leases(all_leases, list(s0.get_leases()))
1224 # reading shares should not modify the timestamp
1225 read("si1", [], [(0,200)])
1226 self.compare_leases(all_leases, list(s0.get_leases()))
1228 write("si1", secrets(0),
1229 {0: ([], [(200, "make me bigger")], None)}, [])
1230 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1232 write("si1", secrets(0),
1233 {0: ([], [(500, "make me really bigger")], None)}, [])
1234 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1236 # now cancel them all
1237 ss.remote_cancel_lease("si1", secrets(0)[2])
1238 ss.remote_cancel_lease("si1", secrets(1)[2])
1239 ss.remote_cancel_lease("si1", secrets(2)[2])
1240 ss.remote_cancel_lease("si1", secrets(3)[2])
1242 # the slot should still be there
1243 remaining_shares = read("si1", [], [(0,10)])
1244 self.failUnlessEqual(len(remaining_shares), 1)
1245 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1247 # cancelling a non-existent lease should raise an IndexError
1248 self.failUnlessRaises(IndexError,
1249 ss.remote_cancel_lease, "si1", "nonsecret")
1251 # and the slot should still be there
1252 remaining_shares = read("si1", [], [(0,10)])
1253 self.failUnlessEqual(len(remaining_shares), 1)
1254 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1256 ss.remote_cancel_lease("si1", secrets(4)[2])
1257 # now the slot should be gone
1258 no_shares = read("si1", [], [(0,10)])
1259 self.failUnlessEqual(no_shares, {})
1261 # cancelling a lease on a non-existent share should raise an IndexError
1262 self.failUnlessRaises(IndexError,
1263 ss.remote_cancel_lease, "si2", "nonsecret")
1265 def test_remove(self):
1266 ss = self.create("test_remove")
1267 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1269 readv = ss.remote_slot_readv
1270 writev = ss.remote_slot_testv_and_readv_and_writev
1271 secrets = ( self.write_enabler("we1"),
1272 self.renew_secret("we1"),
1273 self.cancel_secret("we1") )
1274 # delete sh0 by setting its size to zero
1275 answer = writev("si1", secrets,
1278 # the answer should mention all the shares that existed before the
1280 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1281 # but a new read should show only sh1 and sh2
1282 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1285 # delete sh1 by setting its size to zero
1286 answer = writev("si1", secrets,
1289 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1290 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1293 # delete sh2 by setting its size to zero
1294 answer = writev("si1", secrets,
1297 self.failUnlessEqual(answer, (True, {2:[]}) )
1298 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1300 # and the bucket directory should now be gone
1301 si = base32.b2a("si1")
1302 # note: this is a detail of the storage server implementation, and
1303 # may change in the future
1305 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1306 bucketdir = os.path.join(prefixdir, si)
1307 self.failUnless(os.path.exists(prefixdir), prefixdir)
1308 self.failIf(os.path.exists(bucketdir), bucketdir)
1311 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1313 self.sparent = LoggingServiceParent()
1314 self._lease_secret = itertools.count()
1315 self.ss = self.create("MDMFProxies storage test server")
1316 self.rref = RemoteBucket()
1317 self.rref.target = self.ss
1318 self.secrets = (self.write_enabler("we_secret"),
1319 self.renew_secret("renew_secret"),
1320 self.cancel_secret("cancel_secret"))
1321 self.segment = "aaaaaa"
1323 self.salt = "a" * 16
1324 self.block_hash = "a" * 32
1325 self.block_hash_tree = [self.block_hash for i in xrange(6)]
1326 self.share_hash = self.block_hash
1327 self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1328 self.signature = "foobarbaz"
1329 self.verification_key = "vvvvvv"
1330 self.encprivkey = "private"
1331 self.root_hash = self.block_hash
1332 self.salt_hash = self.root_hash
1333 self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1334 self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1335 self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1336 # blockhashes and salt hashes are serialized in the same way,
1337 # only we lop off the first element and store that in the
1339 self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1343 self.sparent.stopService()
1344 shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1347 def write_enabler(self, we_tag):
1348 return hashutil.tagged_hash("we_blah", we_tag)
1351 def renew_secret(self, tag):
1352 return hashutil.tagged_hash("renew_blah", str(tag))
1355 def cancel_secret(self, tag):
1356 return hashutil.tagged_hash("cancel_blah", str(tag))
1359 def workdir(self, name):
1360 basedir = os.path.join("storage", "MutableServer", name)
1364 def create(self, name):
1365 workdir = self.workdir(name)
1366 ss = StorageServer(workdir, "\x00" * 20)
1367 ss.setServiceParent(self.sparent)
1371 def build_test_mdmf_share(self, tail_segment=False, empty=False):
1372 # Start with the checkstring
1373 data = struct.pack(">BQ32s",
1377 self.checkstring = data
1378 # Next, the encoding parameters
1380 data += struct.pack(">BBQQ",
1386 data += struct.pack(">BBQQ",
1392 data += struct.pack(">BBQQ",
1397 # Now we'll build the offsets.
1399 if not tail_segment and not empty:
1401 sharedata += self.salt + self.block
1404 sharedata += self.salt + self.block
1405 sharedata += self.salt + "a"
1407 # The encrypted private key comes after the shares + salts
1408 offset_size = struct.calcsize(MDMFOFFSETS)
1409 encrypted_private_key_offset = len(data) + offset_size
1410 # The share has chain comes after the private key
1411 sharehashes_offset = encrypted_private_key_offset + \
1412 len(self.encprivkey)
1414 # The signature comes after the share hash chain.
1415 signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1417 verification_key_offset = signature_offset + len(self.signature)
1418 verification_key_end = verification_key_offset + \
1419 len(self.verification_key)
1421 share_data_offset = offset_size
1422 share_data_offset += PRIVATE_KEY_SIZE
1423 share_data_offset += SIGNATURE_SIZE
1424 share_data_offset += VERIFICATION_KEY_SIZE
1425 share_data_offset += SHARE_HASH_CHAIN_SIZE
1427 blockhashes_offset = share_data_offset + len(sharedata)
1428 eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1430 data += struct.pack(MDMFOFFSETS,
1431 encrypted_private_key_offset,
1434 verification_key_offset,
1435 verification_key_end,
1441 self.offsets['enc_privkey'] = encrypted_private_key_offset
1442 self.offsets['block_hash_tree'] = blockhashes_offset
1443 self.offsets['share_hash_chain'] = sharehashes_offset
1444 self.offsets['signature'] = signature_offset
1445 self.offsets['verification_key'] = verification_key_offset
1446 self.offsets['share_data'] = share_data_offset
1447 self.offsets['verification_key_end'] = verification_key_end
1448 self.offsets['EOF'] = eof_offset
1451 data += self.encprivkey
1453 data += self.share_hash_chain_s
1455 data += self.signature
1456 # and the verification key
1457 data += self.verification_key
1458 # Then we'll add in gibberish until we get to the right point.
1459 nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1462 # Then the share data
1465 data += self.block_hash_tree_s
1469 def write_test_share_to_server(self,
1474 I write some data for the read tests to read to self.ss
1476 If tail_segment=True, then I will write a share that has a
1477 smaller tail segment than other segments.
1479 write = self.ss.remote_slot_testv_and_readv_and_writev
1480 data = self.build_test_mdmf_share(tail_segment, empty)
1481 # Finally, we write the whole thing to the storage server in one
1483 testvs = [(0, 1, "eq", "")]
1485 tws[0] = (testvs, [(0, data)], None)
1487 results = write(storage_index, self.secrets, tws, readv)
1488 self.failUnless(results[0])
1491 def build_test_sdmf_share(self, empty=False):
1495 sharedata = self.segment * 6
1496 self.sharedata = sharedata
1497 blocksize = len(sharedata) / 3
1498 block = sharedata[:blocksize]
1499 self.blockdata = block
1500 prefix = struct.pack(">BQ32s16s BBQQ",
1510 post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1511 signature_offset = post_offset + len(self.verification_key)
1512 sharehashes_offset = signature_offset + len(self.signature)
1513 blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1514 sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1515 encprivkey_offset = sharedata_offset + len(block)
1516 eof_offset = encprivkey_offset + len(self.encprivkey)
1517 offsets = struct.pack(">LLLLQQ",
1524 final_share = "".join([prefix,
1526 self.verification_key,
1528 self.share_hash_chain_s,
1529 self.block_hash_tree_s,
1533 self.offsets['signature'] = signature_offset
1534 self.offsets['share_hash_chain'] = sharehashes_offset
1535 self.offsets['block_hash_tree'] = blockhashes_offset
1536 self.offsets['share_data'] = sharedata_offset
1537 self.offsets['enc_privkey'] = encprivkey_offset
1538 self.offsets['EOF'] = eof_offset
1542 def write_sdmf_share_to_server(self,
1545 # Some tests need SDMF shares to verify that we can still
1546 # read them. This method writes one, which resembles but is not
1548 write = self.ss.remote_slot_testv_and_readv_and_writev
1549 share = self.build_test_sdmf_share(empty)
1550 testvs = [(0, 1, "eq", "")]
1552 tws[0] = (testvs, [(0, share)], None)
1554 results = write(storage_index, self.secrets, tws, readv)
1555 self.failUnless(results[0])
1558 def test_read(self):
1559 self.write_test_share_to_server("si1")
1560 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1561 # Check that every method equals what we expect it to.
1562 d = defer.succeed(None)
1563 def _check_block_and_salt((block, salt)):
1564 self.failUnlessEqual(block, self.block)
1565 self.failUnlessEqual(salt, self.salt)
1568 d.addCallback(lambda ignored, i=i:
1569 mr.get_block_and_salt(i))
1570 d.addCallback(_check_block_and_salt)
1572 d.addCallback(lambda ignored:
1573 mr.get_encprivkey())
1574 d.addCallback(lambda encprivkey:
1575 self.failUnlessEqual(self.encprivkey, encprivkey))
1577 d.addCallback(lambda ignored:
1578 mr.get_blockhashes())
1579 d.addCallback(lambda blockhashes:
1580 self.failUnlessEqual(self.block_hash_tree, blockhashes))
1582 d.addCallback(lambda ignored:
1583 mr.get_sharehashes())
1584 d.addCallback(lambda sharehashes:
1585 self.failUnlessEqual(self.share_hash_chain, sharehashes))
1587 d.addCallback(lambda ignored:
1589 d.addCallback(lambda signature:
1590 self.failUnlessEqual(signature, self.signature))
1592 d.addCallback(lambda ignored:
1593 mr.get_verification_key())
1594 d.addCallback(lambda verification_key:
1595 self.failUnlessEqual(verification_key, self.verification_key))
1597 d.addCallback(lambda ignored:
1599 d.addCallback(lambda seqnum:
1600 self.failUnlessEqual(seqnum, 0))
1602 d.addCallback(lambda ignored:
1604 d.addCallback(lambda root_hash:
1605 self.failUnlessEqual(self.root_hash, root_hash))
1607 d.addCallback(lambda ignored:
1609 d.addCallback(lambda seqnum:
1610 self.failUnlessEqual(0, seqnum))
1612 d.addCallback(lambda ignored:
1613 mr.get_encoding_parameters())
1614 def _check_encoding_parameters((k, n, segsize, datalen)):
1615 self.failUnlessEqual(k, 3)
1616 self.failUnlessEqual(n, 10)
1617 self.failUnlessEqual(segsize, 6)
1618 self.failUnlessEqual(datalen, 36)
1619 d.addCallback(_check_encoding_parameters)
1621 d.addCallback(lambda ignored:
1622 mr.get_checkstring())
1623 d.addCallback(lambda checkstring:
1624 self.failUnlessEqual(checkstring, checkstring))
1628 def test_read_with_different_tail_segment_size(self):
1629 self.write_test_share_to_server("si1", tail_segment=True)
1630 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1631 d = mr.get_block_and_salt(5)
1632 def _check_tail_segment(results):
1633 block, salt = results
1634 self.failUnlessEqual(len(block), 1)
1635 self.failUnlessEqual(block, "a")
1636 d.addCallback(_check_tail_segment)
1640 def test_get_block_with_invalid_segnum(self):
1641 self.write_test_share_to_server("si1")
1642 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1643 d = defer.succeed(None)
1644 d.addCallback(lambda ignored:
1645 self.shouldFail(LayoutInvalid, "test invalid segnum",
1647 mr.get_block_and_salt, 7))
1651 def test_get_encoding_parameters_first(self):
1652 self.write_test_share_to_server("si1")
1653 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1654 d = mr.get_encoding_parameters()
1655 def _check_encoding_parameters((k, n, segment_size, datalen)):
1656 self.failUnlessEqual(k, 3)
1657 self.failUnlessEqual(n, 10)
1658 self.failUnlessEqual(segment_size, 6)
1659 self.failUnlessEqual(datalen, 36)
1660 d.addCallback(_check_encoding_parameters)
1664 def test_get_seqnum_first(self):
1665 self.write_test_share_to_server("si1")
1666 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1668 d.addCallback(lambda seqnum:
1669 self.failUnlessEqual(seqnum, 0))
1673 def test_get_root_hash_first(self):
1674 self.write_test_share_to_server("si1")
1675 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1676 d = mr.get_root_hash()
1677 d.addCallback(lambda root_hash:
1678 self.failUnlessEqual(root_hash, self.root_hash))
1682 def test_get_checkstring_first(self):
1683 self.write_test_share_to_server("si1")
1684 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1685 d = mr.get_checkstring()
1686 d.addCallback(lambda checkstring:
1687 self.failUnlessEqual(checkstring, self.checkstring))
1691 def test_write_read_vectors(self):
1692 # When writing for us, the storage server will return to us a
1693 # read vector, along with its result. If a write fails because
1694 # the test vectors failed, this read vector can help us to
1695 # diagnose the problem. This test ensures that the read vector
1696 # is working appropriately.
1697 mw = self._make_new_mw("si1", 0)
1700 mw.put_block(self.block, i, self.salt)
1701 mw.put_encprivkey(self.encprivkey)
1702 mw.put_blockhashes(self.block_hash_tree)
1703 mw.put_sharehashes(self.share_hash_chain)
1704 mw.put_root_hash(self.root_hash)
1705 mw.put_signature(self.signature)
1706 mw.put_verification_key(self.verification_key)
1707 d = mw.finish_publishing()
1709 self.failUnless(len(results), 2)
1710 result, readv = results
1711 self.failUnless(result)
1713 self.old_checkstring = mw.get_checkstring()
1714 mw.set_checkstring("")
1715 d.addCallback(_then)
1716 d.addCallback(lambda ignored:
1717 mw.finish_publishing())
1718 def _then_again(results):
1719 self.failUnlessEqual(len(results), 2)
1720 result, readvs = results
1722 self.failUnlessIn(0, readvs)
1723 readv = readvs[0][0]
1724 self.failUnlessEqual(readv, self.old_checkstring)
1725 d.addCallback(_then_again)
1726 # The checkstring remains the same for the rest of the process.
1730 def test_private_key_after_share_hash_chain(self):
1731 mw = self._make_new_mw("si1", 0)
1732 d = defer.succeed(None)
1734 d.addCallback(lambda ignored, i=i:
1735 mw.put_block(self.block, i, self.salt))
1736 d.addCallback(lambda ignored:
1737 mw.put_encprivkey(self.encprivkey))
1738 d.addCallback(lambda ignored:
1739 mw.put_sharehashes(self.share_hash_chain))
1741 # Now try to put the private key again.
1742 d.addCallback(lambda ignored:
1743 self.shouldFail(LayoutInvalid, "test repeat private key",
1745 mw.put_encprivkey, self.encprivkey))
1749 def test_signature_after_verification_key(self):
1750 mw = self._make_new_mw("si1", 0)
1751 d = defer.succeed(None)
1752 # Put everything up to and including the verification key.
1754 d.addCallback(lambda ignored, i=i:
1755 mw.put_block(self.block, i, self.salt))
1756 d.addCallback(lambda ignored:
1757 mw.put_encprivkey(self.encprivkey))
1758 d.addCallback(lambda ignored:
1759 mw.put_blockhashes(self.block_hash_tree))
1760 d.addCallback(lambda ignored:
1761 mw.put_sharehashes(self.share_hash_chain))
1762 d.addCallback(lambda ignored:
1763 mw.put_root_hash(self.root_hash))
1764 d.addCallback(lambda ignored:
1765 mw.put_signature(self.signature))
1766 d.addCallback(lambda ignored:
1767 mw.put_verification_key(self.verification_key))
1768 # Now try to put the signature again. This should fail
1769 d.addCallback(lambda ignored:
1770 self.shouldFail(LayoutInvalid, "signature after verification",
1772 mw.put_signature, self.signature))
1776 def test_uncoordinated_write(self):
1777 # Make two mutable writers, both pointing to the same storage
1778 # server, both at the same storage index, and try writing to the
1780 mw1 = self._make_new_mw("si1", 0)
1781 mw2 = self._make_new_mw("si1", 0)
1783 def _check_success(results):
1784 result, readvs = results
1785 self.failUnless(result)
1787 def _check_failure(results):
1788 result, readvs = results
1791 def _write_share(mw):
1793 mw.put_block(self.block, i, self.salt)
1794 mw.put_encprivkey(self.encprivkey)
1795 mw.put_blockhashes(self.block_hash_tree)
1796 mw.put_sharehashes(self.share_hash_chain)
1797 mw.put_root_hash(self.root_hash)
1798 mw.put_signature(self.signature)
1799 mw.put_verification_key(self.verification_key)
1800 return mw.finish_publishing()
1801 d = _write_share(mw1)
1802 d.addCallback(_check_success)
1803 d.addCallback(lambda ignored:
1805 d.addCallback(_check_failure)
1809 def test_invalid_salt_size(self):
1810 # Salts need to be 16 bytes in size. Writes that attempt to
1811 # write more or less than this should be rejected.
1812 mw = self._make_new_mw("si1", 0)
1813 invalid_salt = "a" * 17 # 17 bytes
1814 another_invalid_salt = "b" * 15 # 15 bytes
1815 d = defer.succeed(None)
1816 d.addCallback(lambda ignored:
1817 self.shouldFail(LayoutInvalid, "salt too big",
1819 mw.put_block, self.block, 0, invalid_salt))
1820 d.addCallback(lambda ignored:
1821 self.shouldFail(LayoutInvalid, "salt too small",
1823 mw.put_block, self.block, 0,
1824 another_invalid_salt))
1828 def test_write_test_vectors(self):
1829 # If we give the write proxy a bogus test vector at
1830 # any point during the process, it should fail to write when we
1832 def _check_failure(results):
1833 self.failUnlessEqual(len(results), 2)
1837 def _check_success(results):
1838 self.failUnlessEqual(len(results), 2)
1840 self.failUnless(results)
1842 mw = self._make_new_mw("si1", 0)
1843 mw.set_checkstring("this is a lie")
1845 mw.put_block(self.block, i, self.salt)
1846 mw.put_encprivkey(self.encprivkey)
1847 mw.put_blockhashes(self.block_hash_tree)
1848 mw.put_sharehashes(self.share_hash_chain)
1849 mw.put_root_hash(self.root_hash)
1850 mw.put_signature(self.signature)
1851 mw.put_verification_key(self.verification_key)
1852 d = mw.finish_publishing()
1853 d.addCallback(_check_failure)
1854 d.addCallback(lambda ignored:
1855 mw.set_checkstring(""))
1856 d.addCallback(lambda ignored:
1857 mw.finish_publishing())
1858 d.addCallback(_check_success)
1862 def serialize_blockhashes(self, blockhashes):
1863 return "".join(blockhashes)
1866 def serialize_sharehashes(self, sharehashes):
1867 ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1868 for i in sorted(sharehashes.keys())])
1872 def test_write(self):
1873 # This translates to a file with 6 6-byte segments, and with 2-byte
1875 mw = self._make_new_mw("si1", 0)
1876 # Test writing some blocks.
1877 read = self.ss.remote_slot_readv
1878 expected_private_key_offset = struct.calcsize(MDMFHEADER)
1879 expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1880 PRIVATE_KEY_SIZE + \
1882 VERIFICATION_KEY_SIZE + \
1883 SHARE_HASH_CHAIN_SIZE
1884 written_block_size = 2 + len(self.salt)
1885 written_block = self.block + self.salt
1887 mw.put_block(self.block, i, self.salt)
1889 mw.put_encprivkey(self.encprivkey)
1890 mw.put_blockhashes(self.block_hash_tree)
1891 mw.put_sharehashes(self.share_hash_chain)
1892 mw.put_root_hash(self.root_hash)
1893 mw.put_signature(self.signature)
1894 mw.put_verification_key(self.verification_key)
1895 d = mw.finish_publishing()
1896 def _check_publish(results):
1897 self.failUnlessEqual(len(results), 2)
1898 result, ign = results
1899 self.failUnless(result, "publish failed")
1901 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1902 {0: [written_block]})
1904 self.failUnlessEqual(len(self.encprivkey), 7)
1905 self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1906 {0: [self.encprivkey]})
1908 expected_block_hash_offset = expected_sharedata_offset + \
1909 (6 * written_block_size)
1910 self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1911 self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1912 {0: [self.block_hash_tree_s]})
1914 expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1915 self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1916 {0: [self.share_hash_chain_s]})
1918 self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1919 {0: [self.root_hash]})
1920 expected_signature_offset = expected_share_hash_offset + \
1921 len(self.share_hash_chain_s)
1922 self.failUnlessEqual(len(self.signature), 9)
1923 self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1924 {0: [self.signature]})
1926 expected_verification_key_offset = expected_signature_offset + len(self.signature)
1927 self.failUnlessEqual(len(self.verification_key), 6)
1928 self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1929 {0: [self.verification_key]})
1931 signable = mw.get_signable()
1932 verno, seq, roothash, k, n, segsize, datalen = \
1933 struct.unpack(">BQ32sBBQQ",
1935 self.failUnlessEqual(verno, 1)
1936 self.failUnlessEqual(seq, 0)
1937 self.failUnlessEqual(roothash, self.root_hash)
1938 self.failUnlessEqual(k, 3)
1939 self.failUnlessEqual(n, 10)
1940 self.failUnlessEqual(segsize, 6)
1941 self.failUnlessEqual(datalen, 36)
1942 expected_eof_offset = expected_block_hash_offset + \
1943 len(self.block_hash_tree_s)
1945 # Check the version number to make sure that it is correct.
1946 expected_version_number = struct.pack(">B", 1)
1947 self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1948 {0: [expected_version_number]})
1949 # Check the sequence number to make sure that it is correct
1950 expected_sequence_number = struct.pack(">Q", 0)
1951 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
1952 {0: [expected_sequence_number]})
1953 # Check that the encoding parameters (k, N, segement size, data
1954 # length) are what they should be. These are 3, 10, 6, 36
1955 expected_k = struct.pack(">B", 3)
1956 self.failUnlessEqual(read("si1", [0], [(41, 1)]),
1958 expected_n = struct.pack(">B", 10)
1959 self.failUnlessEqual(read("si1", [0], [(42, 1)]),
1961 expected_segment_size = struct.pack(">Q", 6)
1962 self.failUnlessEqual(read("si1", [0], [(43, 8)]),
1963 {0: [expected_segment_size]})
1964 expected_data_length = struct.pack(">Q", 36)
1965 self.failUnlessEqual(read("si1", [0], [(51, 8)]),
1966 {0: [expected_data_length]})
1967 expected_offset = struct.pack(">Q", expected_private_key_offset)
1968 self.failUnlessEqual(read("si1", [0], [(59, 8)]),
1969 {0: [expected_offset]})
1970 expected_offset = struct.pack(">Q", expected_share_hash_offset)
1971 self.failUnlessEqual(read("si1", [0], [(67, 8)]),
1972 {0: [expected_offset]})
1973 expected_offset = struct.pack(">Q", expected_signature_offset)
1974 self.failUnlessEqual(read("si1", [0], [(75, 8)]),
1975 {0: [expected_offset]})
1976 expected_offset = struct.pack(">Q", expected_verification_key_offset)
1977 self.failUnlessEqual(read("si1", [0], [(83, 8)]),
1978 {0: [expected_offset]})
1979 expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
1980 self.failUnlessEqual(read("si1", [0], [(91, 8)]),
1981 {0: [expected_offset]})
1982 expected_offset = struct.pack(">Q", expected_sharedata_offset)
1983 self.failUnlessEqual(read("si1", [0], [(99, 8)]),
1984 {0: [expected_offset]})
1985 expected_offset = struct.pack(">Q", expected_block_hash_offset)
1986 self.failUnlessEqual(read("si1", [0], [(107, 8)]),
1987 {0: [expected_offset]})
1988 expected_offset = struct.pack(">Q", expected_eof_offset)
1989 self.failUnlessEqual(read("si1", [0], [(115, 8)]),
1990 {0: [expected_offset]})
1991 d.addCallback(_check_publish)
1994 def _make_new_mw(self, si, share, datalength=36):
1995 # This is a file of size 36 bytes. Since it has a segment
1996 # size of 6, we know that it has 6 byte segments, which will
1997 # be split into blocks of 2 bytes because our FEC k
1999 mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2004 def test_write_rejected_with_too_many_blocks(self):
2005 mw = self._make_new_mw("si0", 0)
2007 # Try writing too many blocks. We should not be able to write
2009 # blocks into each share.
2010 d = defer.succeed(None)
2012 d.addCallback(lambda ignored, i=i:
2013 mw.put_block(self.block, i, self.salt))
2014 d.addCallback(lambda ignored:
2015 self.shouldFail(LayoutInvalid, "too many blocks",
2017 mw.put_block, self.block, 7, self.salt))
2021 def test_write_rejected_with_invalid_salt(self):
2022 # Try writing an invalid salt. Salts are 16 bytes -- any more or
2023 # less should cause an error.
2024 mw = self._make_new_mw("si1", 0)
2025 bad_salt = "a" * 17 # 17 bytes
2026 d = defer.succeed(None)
2027 d.addCallback(lambda ignored:
2028 self.shouldFail(LayoutInvalid, "test_invalid_salt",
2029 None, mw.put_block, self.block, 7, bad_salt))
2033 def test_write_rejected_with_invalid_root_hash(self):
2034 # Try writing an invalid root hash. This should be SHA256d, and
2035 # 32 bytes long as a result.
2036 mw = self._make_new_mw("si2", 0)
2037 # 17 bytes != 32 bytes
2038 invalid_root_hash = "a" * 17
2039 d = defer.succeed(None)
2040 # Before this test can work, we need to put some blocks + salts,
2041 # a block hash tree, and a share hash tree. Otherwise, we'll see
2042 # failures that match what we are looking for, but are caused by
2043 # the constraints imposed on operation ordering.
2045 d.addCallback(lambda ignored, i=i:
2046 mw.put_block(self.block, i, self.salt))
2047 d.addCallback(lambda ignored:
2048 mw.put_encprivkey(self.encprivkey))
2049 d.addCallback(lambda ignored:
2050 mw.put_blockhashes(self.block_hash_tree))
2051 d.addCallback(lambda ignored:
2052 mw.put_sharehashes(self.share_hash_chain))
2053 d.addCallback(lambda ignored:
2054 self.shouldFail(LayoutInvalid, "invalid root hash",
2055 None, mw.put_root_hash, invalid_root_hash))
2059 def test_write_rejected_with_invalid_blocksize(self):
2060 # The blocksize implied by the writer that we get from
2061 # _make_new_mw is 2bytes -- any more or any less than this
2062 # should be cause for failure, unless it is the tail segment, in
2063 # which case it may not be failure.
2065 mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2067 # 1 bytes != 2 bytes
2068 d = defer.succeed(None)
2069 d.addCallback(lambda ignored, invalid_block=invalid_block:
2070 self.shouldFail(LayoutInvalid, "test blocksize too small",
2071 None, mw.put_block, invalid_block, 0,
2073 invalid_block = invalid_block * 3
2074 # 3 bytes != 2 bytes
2075 d.addCallback(lambda ignored:
2076 self.shouldFail(LayoutInvalid, "test blocksize too large",
2078 mw.put_block, invalid_block, 0, self.salt))
2080 d.addCallback(lambda ignored, i=i:
2081 mw.put_block(self.block, i, self.salt))
2082 # Try to put an invalid tail segment
2083 d.addCallback(lambda ignored:
2084 self.shouldFail(LayoutInvalid, "test invalid tail segment",
2086 mw.put_block, self.block, 5, self.salt))
2088 d.addCallback(lambda ignored:
2089 mw.put_block(valid_block, 5, self.salt))
2093 def test_write_enforces_order_constraints(self):
2094 # We require that the MDMFSlotWriteProxy be interacted with in a
2098 # 1: write blocks and salts
2099 # 2: Write the encrypted private key
2100 # 3: Write the block hashes
2101 # 4: Write the share hashes
2102 # 5: Write the root hash and salt hash
2103 # 6: Write the signature and verification key
2104 # 7: Write the file.
2106 # Some of these can be performed out-of-order, and some can't.
2107 # The dependencies that I want to test here are:
2108 # - Private key before block hashes
2109 # - share hashes and block hashes before root hash
2110 # - root hash before signature
2111 # - signature before verification key
2112 mw0 = self._make_new_mw("si0", 0)
2114 d = defer.succeed(None)
2116 d.addCallback(lambda ignored, i=i:
2117 mw0.put_block(self.block, i, self.salt))
2119 # Try to write the share hash chain without writing the
2120 # encrypted private key
2121 d.addCallback(lambda ignored:
2122 self.shouldFail(LayoutInvalid, "share hash chain before "
2125 mw0.put_sharehashes, self.share_hash_chain))
2126 # Write the private key.
2127 d.addCallback(lambda ignored:
2128 mw0.put_encprivkey(self.encprivkey))
2130 # Now write the block hashes and try again
2131 d.addCallback(lambda ignored:
2132 mw0.put_blockhashes(self.block_hash_tree))
2134 # We haven't yet put the root hash on the share, so we shouldn't
2135 # be able to sign it.
2136 d.addCallback(lambda ignored:
2137 self.shouldFail(LayoutInvalid, "signature before root hash",
2138 None, mw0.put_signature, self.signature))
2140 d.addCallback(lambda ignored:
2141 self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2143 # ..and, since that fails, we also shouldn't be able to put the
2145 d.addCallback(lambda ignored:
2146 self.shouldFail(LayoutInvalid, "key before signature",
2147 None, mw0.put_verification_key,
2148 self.verification_key))
2150 # Now write the share hashes.
2151 d.addCallback(lambda ignored:
2152 mw0.put_sharehashes(self.share_hash_chain))
2153 # We should be able to write the root hash now too
2154 d.addCallback(lambda ignored:
2155 mw0.put_root_hash(self.root_hash))
2157 # We should still be unable to put the verification key
2158 d.addCallback(lambda ignored:
2159 self.shouldFail(LayoutInvalid, "key before signature",
2160 None, mw0.put_verification_key,
2161 self.verification_key))
2163 d.addCallback(lambda ignored:
2164 mw0.put_signature(self.signature))
2166 # We shouldn't be able to write the offsets to the remote server
2167 # until the offset table is finished; IOW, until we have written
2168 # the verification key.
2169 d.addCallback(lambda ignored:
2170 self.shouldFail(LayoutInvalid, "offsets before verification key",
2172 mw0.finish_publishing))
2174 d.addCallback(lambda ignored:
2175 mw0.put_verification_key(self.verification_key))
2179 def test_end_to_end(self):
2180 mw = self._make_new_mw("si1", 0)
2181 # Write a share using the mutable writer, and make sure that the
2182 # reader knows how to read everything back to us.
2183 d = defer.succeed(None)
2185 d.addCallback(lambda ignored, i=i:
2186 mw.put_block(self.block, i, self.salt))
2187 d.addCallback(lambda ignored:
2188 mw.put_encprivkey(self.encprivkey))
2189 d.addCallback(lambda ignored:
2190 mw.put_blockhashes(self.block_hash_tree))
2191 d.addCallback(lambda ignored:
2192 mw.put_sharehashes(self.share_hash_chain))
2193 d.addCallback(lambda ignored:
2194 mw.put_root_hash(self.root_hash))
2195 d.addCallback(lambda ignored:
2196 mw.put_signature(self.signature))
2197 d.addCallback(lambda ignored:
2198 mw.put_verification_key(self.verification_key))
2199 d.addCallback(lambda ignored:
2200 mw.finish_publishing())
2202 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2203 def _check_block_and_salt((block, salt)):
2204 self.failUnlessEqual(block, self.block)
2205 self.failUnlessEqual(salt, self.salt)
2208 d.addCallback(lambda ignored, i=i:
2209 mr.get_block_and_salt(i))
2210 d.addCallback(_check_block_and_salt)
2212 d.addCallback(lambda ignored:
2213 mr.get_encprivkey())
2214 d.addCallback(lambda encprivkey:
2215 self.failUnlessEqual(self.encprivkey, encprivkey))
2217 d.addCallback(lambda ignored:
2218 mr.get_blockhashes())
2219 d.addCallback(lambda blockhashes:
2220 self.failUnlessEqual(self.block_hash_tree, blockhashes))
2222 d.addCallback(lambda ignored:
2223 mr.get_sharehashes())
2224 d.addCallback(lambda sharehashes:
2225 self.failUnlessEqual(self.share_hash_chain, sharehashes))
2227 d.addCallback(lambda ignored:
2229 d.addCallback(lambda signature:
2230 self.failUnlessEqual(signature, self.signature))
2232 d.addCallback(lambda ignored:
2233 mr.get_verification_key())
2234 d.addCallback(lambda verification_key:
2235 self.failUnlessEqual(verification_key, self.verification_key))
2237 d.addCallback(lambda ignored:
2239 d.addCallback(lambda seqnum:
2240 self.failUnlessEqual(seqnum, 0))
2242 d.addCallback(lambda ignored:
2244 d.addCallback(lambda root_hash:
2245 self.failUnlessEqual(self.root_hash, root_hash))
2247 d.addCallback(lambda ignored:
2248 mr.get_encoding_parameters())
2249 def _check_encoding_parameters((k, n, segsize, datalen)):
2250 self.failUnlessEqual(k, 3)
2251 self.failUnlessEqual(n, 10)
2252 self.failUnlessEqual(segsize, 6)
2253 self.failUnlessEqual(datalen, 36)
2254 d.addCallback(_check_encoding_parameters)
2256 d.addCallback(lambda ignored:
2257 mr.get_checkstring())
2258 d.addCallback(lambda checkstring:
2259 self.failUnlessEqual(checkstring, mw.get_checkstring()))
2263 def test_is_sdmf(self):
2264 # The MDMFSlotReadProxy should also know how to read SDMF files,
2265 # since it will encounter them on the grid. Callers use the
2266 # is_sdmf method to test this.
2267 self.write_sdmf_share_to_server("si1")
2268 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2270 d.addCallback(lambda issdmf:
2271 self.failUnless(issdmf))
2275 def test_reads_sdmf(self):
2276 # The slot read proxy should, naturally, know how to tell us
2277 # about data in the SDMF format
2278 self.write_sdmf_share_to_server("si1")
2279 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2280 d = defer.succeed(None)
2281 d.addCallback(lambda ignored:
2283 d.addCallback(lambda issdmf:
2284 self.failUnless(issdmf))
2286 # What do we need to read?
2289 d.addCallback(lambda ignored:
2290 mr.get_block_and_salt(0))
2291 def _check_block_and_salt(results):
2292 block, salt = results
2293 # Our original file is 36 bytes long. Then each share is 12
2294 # bytes in size. The share is composed entirely of the
2295 # letter a. self.block contains 2 as, so 6 * self.block is
2296 # what we are looking for.
2297 self.failUnlessEqual(block, self.block * 6)
2298 self.failUnlessEqual(salt, self.salt)
2299 d.addCallback(_check_block_and_salt)
2302 d.addCallback(lambda ignored:
2303 mr.get_blockhashes())
2304 d.addCallback(lambda blockhashes:
2305 self.failUnlessEqual(self.block_hash_tree,
2309 d.addCallback(lambda ignored:
2310 mr.get_sharehashes())
2311 d.addCallback(lambda sharehashes:
2312 self.failUnlessEqual(self.share_hash_chain,
2315 d.addCallback(lambda ignored:
2316 mr.get_encprivkey())
2317 d.addCallback(lambda encprivkey:
2318 self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2319 d.addCallback(lambda ignored:
2320 mr.get_verification_key())
2321 d.addCallback(lambda verification_key:
2322 self.failUnlessEqual(verification_key,
2323 self.verification_key,
2326 d.addCallback(lambda ignored:
2328 d.addCallback(lambda signature:
2329 self.failUnlessEqual(signature, self.signature, signature))
2331 # - The sequence number
2332 d.addCallback(lambda ignored:
2334 d.addCallback(lambda seqnum:
2335 self.failUnlessEqual(seqnum, 0, seqnum))
2338 d.addCallback(lambda ignored:
2340 d.addCallback(lambda root_hash:
2341 self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2345 def test_only_reads_one_segment_sdmf(self):
2346 # SDMF shares have only one segment, so it doesn't make sense to
2347 # read more segments than that. The reader should know this and
2348 # complain if we try to do that.
2349 self.write_sdmf_share_to_server("si1")
2350 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2351 d = defer.succeed(None)
2352 d.addCallback(lambda ignored:
2354 d.addCallback(lambda issdmf:
2355 self.failUnless(issdmf))
2356 d.addCallback(lambda ignored:
2357 self.shouldFail(LayoutInvalid, "test bad segment",
2359 mr.get_block_and_salt, 1))
2363 def test_read_with_prefetched_mdmf_data(self):
2364 # The MDMFSlotReadProxy will prefill certain fields if you pass
2365 # it data that you have already fetched. This is useful for
2366 # cases like the Servermap, which prefetches ~2kb of data while
2367 # finding out which shares are on the remote peer so that it
2368 # doesn't waste round trips.
2369 mdmf_data = self.build_test_mdmf_share()
2370 self.write_test_share_to_server("si1")
2371 def _make_mr(ignored, length):
2372 mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2375 d = defer.succeed(None)
2376 # This should be enough to fill in both the encoding parameters
2377 # and the table of offsets, which will complete the version
2378 # information tuple.
2379 d.addCallback(_make_mr, 123)
2380 d.addCallback(lambda mr:
2382 def _check_verinfo(verinfo):
2383 self.failUnless(verinfo)
2384 self.failUnlessEqual(len(verinfo), 9)
2394 self.failUnlessEqual(seqnum, 0)
2395 self.failUnlessEqual(root_hash, self.root_hash)
2396 self.failUnlessEqual(segsize, 6)
2397 self.failUnlessEqual(datalen, 36)
2398 self.failUnlessEqual(k, 3)
2399 self.failUnlessEqual(n, 10)
2400 expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2408 self.failUnlessEqual(expected_prefix, prefix)
2409 self.failUnlessEqual(self.rref.read_count, 0)
2410 d.addCallback(_check_verinfo)
2411 # This is not enough data to read a block and a share, so the
2412 # wrapper should attempt to read this from the remote server.
2413 d.addCallback(_make_mr, 123)
2414 d.addCallback(lambda mr:
2415 mr.get_block_and_salt(0))
2416 def _check_block_and_salt((block, salt)):
2417 self.failUnlessEqual(block, self.block)
2418 self.failUnlessEqual(salt, self.salt)
2419 self.failUnlessEqual(self.rref.read_count, 1)
2420 # This should be enough data to read one block.
2421 d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2422 d.addCallback(lambda mr:
2423 mr.get_block_and_salt(0))
2424 d.addCallback(_check_block_and_salt)
2428 def test_read_with_prefetched_sdmf_data(self):
2429 sdmf_data = self.build_test_sdmf_share()
2430 self.write_sdmf_share_to_server("si1")
2431 def _make_mr(ignored, length):
2432 mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2435 d = defer.succeed(None)
2436 # This should be enough to get us the encoding parameters,
2437 # offset table, and everything else we need to build a verinfo
2439 d.addCallback(_make_mr, 123)
2440 d.addCallback(lambda mr:
2442 def _check_verinfo(verinfo):
2443 self.failUnless(verinfo)
2444 self.failUnlessEqual(len(verinfo), 9)
2454 self.failUnlessEqual(seqnum, 0)
2455 self.failUnlessEqual(root_hash, self.root_hash)
2456 self.failUnlessEqual(salt, self.salt)
2457 self.failUnlessEqual(segsize, 36)
2458 self.failUnlessEqual(datalen, 36)
2459 self.failUnlessEqual(k, 3)
2460 self.failUnlessEqual(n, 10)
2461 expected_prefix = struct.pack(SIGNED_PREFIX,
2470 self.failUnlessEqual(expected_prefix, prefix)
2471 self.failUnlessEqual(self.rref.read_count, 0)
2472 d.addCallback(_check_verinfo)
2473 # This shouldn't be enough to read any share data.
2474 d.addCallback(_make_mr, 123)
2475 d.addCallback(lambda mr:
2476 mr.get_block_and_salt(0))
2477 def _check_block_and_salt((block, salt)):
2478 self.failUnlessEqual(block, self.block * 6)
2479 self.failUnlessEqual(salt, self.salt)
2480 # TODO: Fix the read routine so that it reads only the data
2481 # that it has cached if it can't read all of it.
2482 self.failUnlessEqual(self.rref.read_count, 2)
2484 # This should be enough to read share data.
2485 d.addCallback(_make_mr, self.offsets['share_data'])
2486 d.addCallback(lambda mr:
2487 mr.get_block_and_salt(0))
2488 d.addCallback(_check_block_and_salt)
2492 def test_read_with_empty_mdmf_file(self):
2493 # Some tests upload a file with no contents to test things
2494 # unrelated to the actual handling of the content of the file.
2495 # The reader should behave intelligently in these cases.
2496 self.write_test_share_to_server("si1", empty=True)
2497 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2498 # We should be able to get the encoding parameters, and they
2499 # should be correct.
2500 d = defer.succeed(None)
2501 d.addCallback(lambda ignored:
2502 mr.get_encoding_parameters())
2503 def _check_encoding_parameters(params):
2504 self.failUnlessEqual(len(params), 4)
2505 k, n, segsize, datalen = params
2506 self.failUnlessEqual(k, 3)
2507 self.failUnlessEqual(n, 10)
2508 self.failUnlessEqual(segsize, 0)
2509 self.failUnlessEqual(datalen, 0)
2510 d.addCallback(_check_encoding_parameters)
2512 # We should not be able to fetch a block, since there are no
2514 d.addCallback(lambda ignored:
2515 self.shouldFail(LayoutInvalid, "get block on empty file",
2517 mr.get_block_and_salt, 0))
2521 def test_read_with_empty_sdmf_file(self):
2522 self.write_sdmf_share_to_server("si1", empty=True)
2523 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2524 # We should be able to get the encoding parameters, and they
2526 d = defer.succeed(None)
2527 d.addCallback(lambda ignored:
2528 mr.get_encoding_parameters())
2529 def _check_encoding_parameters(params):
2530 self.failUnlessEqual(len(params), 4)
2531 k, n, segsize, datalen = params
2532 self.failUnlessEqual(k, 3)
2533 self.failUnlessEqual(n, 10)
2534 self.failUnlessEqual(segsize, 0)
2535 self.failUnlessEqual(datalen, 0)
2536 d.addCallback(_check_encoding_parameters)
2538 # It does not make sense to get a block in this format, so we
2539 # should not be able to.
2540 d.addCallback(lambda ignored:
2541 self.shouldFail(LayoutInvalid, "get block on an empty file",
2543 mr.get_block_and_salt, 0))
2547 def test_verinfo_with_sdmf_file(self):
2548 self.write_sdmf_share_to_server("si1")
2549 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2550 # We should be able to get the version information.
2551 d = defer.succeed(None)
2552 d.addCallback(lambda ignored:
2554 def _check_verinfo(verinfo):
2555 self.failUnless(verinfo)
2556 self.failUnlessEqual(len(verinfo), 9)
2566 self.failUnlessEqual(seqnum, 0)
2567 self.failUnlessEqual(root_hash, self.root_hash)
2568 self.failUnlessEqual(salt, self.salt)
2569 self.failUnlessEqual(segsize, 36)
2570 self.failUnlessEqual(datalen, 36)
2571 self.failUnlessEqual(k, 3)
2572 self.failUnlessEqual(n, 10)
2573 expected_prefix = struct.pack(">BQ32s16s BBQQ",
2582 self.failUnlessEqual(prefix, expected_prefix)
2583 self.failUnlessEqual(offsets, self.offsets)
2584 d.addCallback(_check_verinfo)
2588 def test_verinfo_with_mdmf_file(self):
2589 self.write_test_share_to_server("si1")
2590 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2591 d = defer.succeed(None)
2592 d.addCallback(lambda ignored:
2594 def _check_verinfo(verinfo):
2595 self.failUnless(verinfo)
2596 self.failUnlessEqual(len(verinfo), 9)
2606 self.failUnlessEqual(seqnum, 0)
2607 self.failUnlessEqual(root_hash, self.root_hash)
2609 self.failUnlessEqual(segsize, 6)
2610 self.failUnlessEqual(datalen, 36)
2611 self.failUnlessEqual(k, 3)
2612 self.failUnlessEqual(n, 10)
2613 expected_prefix = struct.pack(">BQ32s BBQQ",
2621 self.failUnlessEqual(prefix, expected_prefix)
2622 self.failUnlessEqual(offsets, self.offsets)
2623 d.addCallback(_check_verinfo)
2627 def test_reader_queue(self):
2628 self.write_test_share_to_server('si1')
2629 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2630 d1 = mr.get_block_and_salt(0, queue=True)
2631 d2 = mr.get_blockhashes(queue=True)
2632 d3 = mr.get_sharehashes(queue=True)
2633 d4 = mr.get_signature(queue=True)
2634 d5 = mr.get_verification_key(queue=True)
2635 dl = defer.DeferredList([d1, d2, d3, d4, d5])
2637 def _print(results):
2638 self.failUnlessEqual(len(results), 5)
2639 # We have one read for version information and offsets, and
2640 # one for everything else.
2641 self.failUnlessEqual(self.rref.read_count, 2)
2642 block, salt = results[0][1] # results[0] is a boolean that says
2643 # whether or not the operation
2645 self.failUnlessEqual(self.block, block)
2646 self.failUnlessEqual(self.salt, salt)
2648 blockhashes = results[1][1]
2649 self.failUnlessEqual(self.block_hash_tree, blockhashes)
2651 sharehashes = results[2][1]
2652 self.failUnlessEqual(self.share_hash_chain, sharehashes)
2654 signature = results[3][1]
2655 self.failUnlessEqual(self.signature, signature)
2657 verification_key = results[4][1]
2658 self.failUnlessEqual(self.verification_key, verification_key)
2659 dl.addCallback(_print)
2663 def test_sdmf_writer(self):
2664 # Go through the motions of writing an SDMF share to the storage
2665 # server. Then read the storage server to see that the share got
2666 # written in the way that we think it should have.
2668 # We do this first so that the necessary instance variables get
2669 # set the way we want them for the tests below.
2670 data = self.build_test_sdmf_share()
2671 sdmfr = SDMFSlotWriteProxy(0,
2676 # Put the block and salt.
2677 sdmfr.put_block(self.blockdata, 0, self.salt)
2679 # Put the encprivkey
2680 sdmfr.put_encprivkey(self.encprivkey)
2682 # Put the block and share hash chains
2683 sdmfr.put_blockhashes(self.block_hash_tree)
2684 sdmfr.put_sharehashes(self.share_hash_chain)
2685 sdmfr.put_root_hash(self.root_hash)
2688 sdmfr.put_signature(self.signature)
2690 # Put the verification key
2691 sdmfr.put_verification_key(self.verification_key)
2693 # Now check to make sure that nothing has been written yet.
2694 self.failUnlessEqual(self.rref.write_count, 0)
2696 # Now finish publishing
2697 d = sdmfr.finish_publishing()
2699 self.failUnlessEqual(self.rref.write_count, 1)
2700 read = self.ss.remote_slot_readv
2701 self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2703 d.addCallback(_then)
2707 def test_sdmf_writer_preexisting_share(self):
2708 data = self.build_test_sdmf_share()
2709 self.write_sdmf_share_to_server("si1")
2711 # Now there is a share on the storage server. To successfully
2712 # write, we need to set the checkstring correctly. When we
2713 # don't, no write should occur.
2714 sdmfw = SDMFSlotWriteProxy(0,
2719 sdmfw.put_block(self.blockdata, 0, self.salt)
2721 # Put the encprivkey
2722 sdmfw.put_encprivkey(self.encprivkey)
2724 # Put the block and share hash chains
2725 sdmfw.put_blockhashes(self.block_hash_tree)
2726 sdmfw.put_sharehashes(self.share_hash_chain)
2729 sdmfw.put_root_hash(self.root_hash)
2732 sdmfw.put_signature(self.signature)
2734 # Put the verification key
2735 sdmfw.put_verification_key(self.verification_key)
2737 # We shouldn't have a checkstring yet
2738 self.failUnlessEqual(sdmfw.get_checkstring(), "")
2740 d = sdmfw.finish_publishing()
2742 self.failIf(results[0])
2743 # this is the correct checkstring
2744 self._expected_checkstring = results[1][0][0]
2745 return self._expected_checkstring
2747 d.addCallback(_then)
2748 d.addCallback(sdmfw.set_checkstring)
2749 d.addCallback(lambda ignored:
2750 sdmfw.get_checkstring())
2751 d.addCallback(lambda checkstring:
2752 self.failUnlessEqual(checkstring, self._expected_checkstring))
2753 d.addCallback(lambda ignored:
2754 sdmfw.finish_publishing())
2755 def _then_again(results):
2756 self.failUnless(results[0])
2757 read = self.ss.remote_slot_readv
2758 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2759 {0: [struct.pack(">Q", 1)]})
2760 self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2762 d.addCallback(_then_again)
2766 class Stats(unittest.TestCase):
2769 self.sparent = LoggingServiceParent()
2770 self._lease_secret = itertools.count()
2772 return self.sparent.stopService()
2774 def workdir(self, name):
2775 basedir = os.path.join("storage", "Server", name)
2778 def create(self, name):
2779 workdir = self.workdir(name)
2780 ss = StorageServer(workdir, "\x00" * 20)
2781 ss.setServiceParent(self.sparent)
2784 def test_latencies(self):
2785 ss = self.create("test_latencies")
2786 for i in range(10000):
2787 ss.add_latency("allocate", 1.0 * i)
2788 for i in range(1000):
2789 ss.add_latency("renew", 1.0 * i)
2791 ss.add_latency("write", 1.0 * i)
2793 ss.add_latency("cancel", 2.0 * i)
2794 ss.add_latency("get", 5.0)
2796 output = ss.get_latencies()
2798 self.failUnlessEqual(sorted(output.keys()),
2799 sorted(["allocate", "renew", "cancel", "write", "get"]))
2800 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2801 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2802 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2803 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2804 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2805 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2806 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2807 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2808 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2810 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2811 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2812 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
2813 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2814 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2815 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2816 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2817 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2818 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2820 self.failUnlessEqual(len(ss.latencies["write"]), 20)
2821 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2822 self.failUnless(output["write"]["01_0_percentile"] is None, output)
2823 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
2824 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2825 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2826 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2827 self.failUnless(output["write"]["99_0_percentile"] is None, output)
2828 self.failUnless(output["write"]["99_9_percentile"] is None, output)
2830 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2831 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2832 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2833 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
2834 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2835 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2836 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2837 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2838 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2840 self.failUnlessEqual(len(ss.latencies["get"]), 1)
2841 self.failUnless(output["get"]["mean"] is None, output)
2842 self.failUnless(output["get"]["01_0_percentile"] is None, output)
2843 self.failUnless(output["get"]["10_0_percentile"] is None, output)
2844 self.failUnless(output["get"]["50_0_percentile"] is None, output)
2845 self.failUnless(output["get"]["90_0_percentile"] is None, output)
2846 self.failUnless(output["get"]["95_0_percentile"] is None, output)
2847 self.failUnless(output["get"]["99_0_percentile"] is None, output)
2848 self.failUnless(output["get"]["99_9_percentile"] is None, output)
2851 s = re.sub(r'<[^>]*>', ' ', s)
2852 s = re.sub(r'\s+', ' ', s)
2855 class MyBucketCountingCrawler(BucketCountingCrawler):
2856 def finished_prefix(self, cycle, prefix):
2857 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2859 d = self.hook_ds.pop(0)
2862 class MyStorageServer(StorageServer):
2863 def add_bucket_counter(self):
2864 statefile = os.path.join(self.storedir, "bucket_counter.state")
2865 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2866 self.bucket_counter.setServiceParent(self)
2868 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2871 self.s = service.MultiService()
2872 self.s.startService()
2874 return self.s.stopService()
2876 def test_bucket_counter(self):
2877 basedir = "storage/BucketCounter/bucket_counter"
2878 fileutil.make_dirs(basedir)
2879 ss = StorageServer(basedir, "\x00" * 20)
2880 # to make sure we capture the bucket-counting-crawler in the middle
2881 # of a cycle, we reach in and reduce its maximum slice time to 0. We
2882 # also make it start sooner than usual.
2883 ss.bucket_counter.slow_start = 0
2884 orig_cpu_slice = ss.bucket_counter.cpu_slice
2885 ss.bucket_counter.cpu_slice = 0
2886 ss.setServiceParent(self.s)
2888 w = StorageStatus(ss)
2890 # this sample is before the crawler has started doing anything
2891 html = w.renderSynchronously()
2892 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2893 s = remove_tags(html)
2894 self.failUnlessIn("Accepting new shares: Yes", s)
2895 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2896 self.failUnlessIn("Total buckets: Not computed yet", s)
2897 self.failUnlessIn("Next crawl in", s)
2899 # give the bucket-counting-crawler one tick to get started. The
2900 # cpu_slice=0 will force it to yield right after it processes the
2903 d = fireEventually()
2904 def _check(ignored):
2905 # are we really right after the first prefix?
2906 state = ss.bucket_counter.get_state()
2907 if state["last-complete-prefix"] is None:
2908 d2 = fireEventually()
2909 d2.addCallback(_check)
2911 self.failUnlessEqual(state["last-complete-prefix"],
2912 ss.bucket_counter.prefixes[0])
2913 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2914 html = w.renderSynchronously()
2915 s = remove_tags(html)
2916 self.failUnlessIn(" Current crawl ", s)
2917 self.failUnlessIn(" (next work in ", s)
2918 d.addCallback(_check)
2920 # now give it enough time to complete a full cycle
2922 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2923 d.addCallback(lambda ignored: self.poll(_watch))
2924 def _check2(ignored):
2925 ss.bucket_counter.cpu_slice = orig_cpu_slice
2926 html = w.renderSynchronously()
2927 s = remove_tags(html)
2928 self.failUnlessIn("Total buckets: 0 (the number of", s)
2929 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2930 d.addCallback(_check2)
2933 def test_bucket_counter_cleanup(self):
2934 basedir = "storage/BucketCounter/bucket_counter_cleanup"
2935 fileutil.make_dirs(basedir)
2936 ss = StorageServer(basedir, "\x00" * 20)
2937 # to make sure we capture the bucket-counting-crawler in the middle
2938 # of a cycle, we reach in and reduce its maximum slice time to 0.
2939 ss.bucket_counter.slow_start = 0
2940 orig_cpu_slice = ss.bucket_counter.cpu_slice
2941 ss.bucket_counter.cpu_slice = 0
2942 ss.setServiceParent(self.s)
2944 d = fireEventually()
2946 def _after_first_prefix(ignored):
2947 state = ss.bucket_counter.state
2948 if state["last-complete-prefix"] is None:
2949 d2 = fireEventually()
2950 d2.addCallback(_after_first_prefix)
2952 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2953 # now sneak in and mess with its state, to make sure it cleans up
2954 # properly at the end of the cycle
2955 self.failUnlessEqual(state["last-complete-prefix"],
2956 ss.bucket_counter.prefixes[0])
2957 state["bucket-counts"][-12] = {}
2958 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2959 ss.bucket_counter.save_state()
2960 d.addCallback(_after_first_prefix)
2962 # now give it enough time to complete a cycle
2964 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2965 d.addCallback(lambda ignored: self.poll(_watch))
2966 def _check2(ignored):
2967 ss.bucket_counter.cpu_slice = orig_cpu_slice
2968 s = ss.bucket_counter.get_state()
2969 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2970 self.failIf("bogusprefix!" in s["storage-index-samples"],
2971 s["storage-index-samples"].keys())
2972 d.addCallback(_check2)
2975 def test_bucket_counter_eta(self):
2976 basedir = "storage/BucketCounter/bucket_counter_eta"
2977 fileutil.make_dirs(basedir)
2978 ss = MyStorageServer(basedir, "\x00" * 20)
2979 ss.bucket_counter.slow_start = 0
2980 # these will be fired inside finished_prefix()
2981 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2982 w = StorageStatus(ss)
2984 d = defer.Deferred()
2986 def _check_1(ignored):
2987 # no ETA is available yet
2988 html = w.renderSynchronously()
2989 s = remove_tags(html)
2990 self.failUnlessIn("complete (next work", s)
2992 def _check_2(ignored):
2993 # one prefix has finished, so an ETA based upon that elapsed time
2994 # should be available.
2995 html = w.renderSynchronously()
2996 s = remove_tags(html)
2997 self.failUnlessIn("complete (ETA ", s)
2999 def _check_3(ignored):
3000 # two prefixes have finished
3001 html = w.renderSynchronously()
3002 s = remove_tags(html)
3003 self.failUnlessIn("complete (ETA ", s)
3006 hooks[0].addCallback(_check_1).addErrback(d.errback)
3007 hooks[1].addCallback(_check_2).addErrback(d.errback)
3008 hooks[2].addCallback(_check_3).addErrback(d.errback)
3010 ss.setServiceParent(self.s)
3013 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3014 stop_after_first_bucket = False
3015 def process_bucket(self, *args, **kwargs):
3016 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3017 if self.stop_after_first_bucket:
3018 self.stop_after_first_bucket = False
3019 self.cpu_slice = -1.0
3020 def yielding(self, sleep_time):
3021 if not self.stop_after_first_bucket:
3022 self.cpu_slice = 500
3024 class BrokenStatResults:
3026 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3029 bsr = BrokenStatResults()
3030 for attrname in dir(s):
3031 if attrname.startswith("_"):
3033 if attrname == "st_blocks":
3035 setattr(bsr, attrname, getattr(s, attrname))
3038 class InstrumentedStorageServer(StorageServer):
3039 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3040 class No_ST_BLOCKS_StorageServer(StorageServer):
3041 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3043 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3046 self.s = service.MultiService()
3047 self.s.startService()
3049 return self.s.stopService()
3051 def make_shares(self, ss):
3053 return (si, hashutil.tagged_hash("renew", si),
3054 hashutil.tagged_hash("cancel", si))
3055 def make_mutable(si):
3056 return (si, hashutil.tagged_hash("renew", si),
3057 hashutil.tagged_hash("cancel", si),
3058 hashutil.tagged_hash("write-enabler", si))
3059 def make_extra_lease(si, num):
3060 return (hashutil.tagged_hash("renew-%d" % num, si),
3061 hashutil.tagged_hash("cancel-%d" % num, si))
3063 immutable_si_0, rs0, cs0 = make("\x00" * 16)
3064 immutable_si_1, rs1, cs1 = make("\x01" * 16)
3065 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3066 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3067 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3068 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3070 canary = FakeCanary()
3071 # note: 'tahoe debug dump-share' will not handle this file, since the
3072 # inner contents are not a valid CHK share
3073 data = "\xff" * 1000
3075 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3077 w[0].remote_write(0, data)
3080 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3082 w[0].remote_write(0, data)
3084 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3086 writev = ss.remote_slot_testv_and_readv_and_writev
3087 writev(mutable_si_2, (we2, rs2, cs2),
3088 {0: ([], [(0,data)], len(data))}, [])
3089 writev(mutable_si_3, (we3, rs3, cs3),
3090 {0: ([], [(0,data)], len(data))}, [])
3091 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3093 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3094 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3095 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3097 def test_basic(self):
3098 basedir = "storage/LeaseCrawler/basic"
3099 fileutil.make_dirs(basedir)
3100 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3101 # make it start sooner than usual.
3102 lc = ss.lease_checker
3105 lc.stop_after_first_bucket = True
3106 webstatus = StorageStatus(ss)
3108 # create a few shares, with some leases on them
3109 self.make_shares(ss)
3110 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3112 # add a non-sharefile to exercise another code path
3113 fn = os.path.join(ss.sharedir,
3114 storage_index_to_dir(immutable_si_0),
3117 f.write("I am not a share.\n")
3120 # this is before the crawl has started, so we're not in a cycle yet
3121 initial_state = lc.get_state()
3122 self.failIf(lc.get_progress()["cycle-in-progress"])
3123 self.failIfIn("cycle-to-date", initial_state)
3124 self.failIfIn("estimated-remaining-cycle", initial_state)
3125 self.failIfIn("estimated-current-cycle", initial_state)
3126 self.failUnlessIn("history", initial_state)
3127 self.failUnlessEqual(initial_state["history"], {})
3129 ss.setServiceParent(self.s)
3133 d = fireEventually()
3135 # now examine the state right after the first bucket has been
3137 def _after_first_bucket(ignored):
3138 initial_state = lc.get_state()
3139 if "cycle-to-date" not in initial_state:
3140 d2 = fireEventually()
3141 d2.addCallback(_after_first_bucket)
3143 self.failUnlessIn("cycle-to-date", initial_state)
3144 self.failUnlessIn("estimated-remaining-cycle", initial_state)
3145 self.failUnlessIn("estimated-current-cycle", initial_state)
3146 self.failUnlessIn("history", initial_state)
3147 self.failUnlessEqual(initial_state["history"], {})
3149 so_far = initial_state["cycle-to-date"]
3150 self.failUnlessEqual(so_far["expiration-enabled"], False)
3151 self.failUnlessIn("configured-expiration-mode", so_far)
3152 self.failUnlessIn("lease-age-histogram", so_far)
3153 lah = so_far["lease-age-histogram"]
3154 self.failUnlessEqual(type(lah), list)
3155 self.failUnlessEqual(len(lah), 1)
3156 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3157 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3158 self.failUnlessEqual(so_far["corrupt-shares"], [])
3159 sr1 = so_far["space-recovered"]
3160 self.failUnlessEqual(sr1["examined-buckets"], 1)
3161 self.failUnlessEqual(sr1["examined-shares"], 1)
3162 self.failUnlessEqual(sr1["actual-shares"], 0)
3163 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3164 self.failUnlessEqual(sr1["original-sharebytes"], 0)
3165 left = initial_state["estimated-remaining-cycle"]
3166 sr2 = left["space-recovered"]
3167 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3168 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3169 self.failIfEqual(sr2["actual-shares"], None)
3170 self.failIfEqual(sr2["configured-diskbytes"], None)
3171 self.failIfEqual(sr2["original-sharebytes"], None)
3172 d.addCallback(_after_first_bucket)
3173 d.addCallback(lambda ign: self.render1(webstatus))
3174 def _check_html_in_cycle(html):
3175 s = remove_tags(html)
3176 self.failUnlessIn("So far, this cycle has examined "
3177 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3178 self.failUnlessIn("and has recovered: "
3179 "0 shares, 0 buckets (0 mutable / 0 immutable), "
3180 "0 B (0 B / 0 B)", s)
3181 self.failUnlessIn("If expiration were enabled, "
3182 "we would have recovered: "
3183 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3184 " 0 B (0 B / 0 B) by now", s)
3185 self.failUnlessIn("and the remainder of this cycle "
3186 "would probably recover: "
3187 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3188 " 0 B (0 B / 0 B)", s)
3189 self.failUnlessIn("and the whole cycle would probably recover: "
3190 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3191 " 0 B (0 B / 0 B)", s)
3192 self.failUnlessIn("if we were strictly using each lease's default "
3193 "31-day lease lifetime", s)
3194 self.failUnlessIn("this cycle would be expected to recover: ", s)
3195 d.addCallback(_check_html_in_cycle)
3197 # wait for the crawler to finish the first cycle. Nothing should have
3200 return bool(lc.get_state()["last-cycle-finished"] is not None)
3201 d.addCallback(lambda ign: self.poll(_wait))
3203 def _after_first_cycle(ignored):
3205 self.failIf("cycle-to-date" in s)
3206 self.failIf("estimated-remaining-cycle" in s)
3207 self.failIf("estimated-current-cycle" in s)
3208 last = s["history"][0]
3209 self.failUnlessIn("cycle-start-finish-times", last)
3210 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3211 self.failUnlessEqual(last["expiration-enabled"], False)
3212 self.failUnlessIn("configured-expiration-mode", last)
3214 self.failUnlessIn("lease-age-histogram", last)
3215 lah = last["lease-age-histogram"]
3216 self.failUnlessEqual(type(lah), list)
3217 self.failUnlessEqual(len(lah), 1)
3218 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3220 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3221 self.failUnlessEqual(last["corrupt-shares"], [])
3223 rec = last["space-recovered"]
3224 self.failUnlessEqual(rec["examined-buckets"], 4)
3225 self.failUnlessEqual(rec["examined-shares"], 4)
3226 self.failUnlessEqual(rec["actual-buckets"], 0)
3227 self.failUnlessEqual(rec["original-buckets"], 0)
3228 self.failUnlessEqual(rec["configured-buckets"], 0)
3229 self.failUnlessEqual(rec["actual-shares"], 0)
3230 self.failUnlessEqual(rec["original-shares"], 0)
3231 self.failUnlessEqual(rec["configured-shares"], 0)
3232 self.failUnlessEqual(rec["actual-diskbytes"], 0)
3233 self.failUnlessEqual(rec["original-diskbytes"], 0)
3234 self.failUnlessEqual(rec["configured-diskbytes"], 0)
3235 self.failUnlessEqual(rec["actual-sharebytes"], 0)
3236 self.failUnlessEqual(rec["original-sharebytes"], 0)
3237 self.failUnlessEqual(rec["configured-sharebytes"], 0)
3239 def _get_sharefile(si):
3240 return list(ss._iter_share_files(si))[0]
3241 def count_leases(si):
3242 return len(list(_get_sharefile(si).get_leases()))
3243 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3244 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3245 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3246 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3247 d.addCallback(_after_first_cycle)
3248 d.addCallback(lambda ign: self.render1(webstatus))
3249 def _check_html(html):
3250 s = remove_tags(html)
3251 self.failUnlessIn("recovered: 0 shares, 0 buckets "
3252 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3253 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3254 "(2 mutable / 2 immutable),", s)
3255 self.failUnlessIn("but expiration was not enabled", s)
3256 d.addCallback(_check_html)
3257 d.addCallback(lambda ign: self.render_json(webstatus))
3258 def _check_json(json):
3259 data = simplejson.loads(json)
3260 self.failUnlessIn("lease-checker", data)
3261 self.failUnlessIn("lease-checker-progress", data)
3262 d.addCallback(_check_json)
3265 def backdate_lease(self, sf, renew_secret, new_expire_time):
3266 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3267 # "renew" a lease with a new_expire_time that is older than what the
3268 # current lease has), so we have to reach inside it.
3269 for i,lease in enumerate(sf.get_leases()):
3270 if lease.renew_secret == renew_secret:
3271 lease.expiration_time = new_expire_time
3272 f = open(sf.home, 'rb+')
3273 sf._write_lease_record(f, i, lease)
3276 raise IndexError("unable to renew non-existent lease")
3278 def test_expire_age(self):
3279 basedir = "storage/LeaseCrawler/expire_age"
3280 fileutil.make_dirs(basedir)
3281 # setting expiration_time to 2000 means that any lease which is more
3282 # than 2000s old will be expired.
3283 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3284 expiration_enabled=True,
3285 expiration_mode="age",
3286 expiration_override_lease_duration=2000)
3287 # make it start sooner than usual.
3288 lc = ss.lease_checker
3290 lc.stop_after_first_bucket = True
3291 webstatus = StorageStatus(ss)
3293 # create a few shares, with some leases on them
3294 self.make_shares(ss)
3295 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3297 def count_shares(si):
3298 return len(list(ss._iter_share_files(si)))
3299 def _get_sharefile(si):
3300 return list(ss._iter_share_files(si))[0]
3301 def count_leases(si):
3302 return len(list(_get_sharefile(si).get_leases()))
3304 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3305 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3306 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3307 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3308 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3309 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3310 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3311 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3313 # artificially crank back the expiration time on the first lease of
3314 # each share, to make it look like it expired already (age=1000s).
3315 # Some shares have an extra lease which is set to expire at the
3316 # default time in 31 days from now (age=31days). We then run the
3317 # crawler, which will expire the first lease, making some shares get
3318 # deleted and others stay alive (with one remaining lease)
3321 sf0 = _get_sharefile(immutable_si_0)
3322 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3323 sf0_size = os.stat(sf0.home).st_size
3325 # immutable_si_1 gets an extra lease
3326 sf1 = _get_sharefile(immutable_si_1)
3327 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3329 sf2 = _get_sharefile(mutable_si_2)
3330 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3331 sf2_size = os.stat(sf2.home).st_size
3333 # mutable_si_3 gets an extra lease
3334 sf3 = _get_sharefile(mutable_si_3)
3335 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3337 ss.setServiceParent(self.s)
3339 d = fireEventually()
3340 # examine the state right after the first bucket has been processed
3341 def _after_first_bucket(ignored):
3342 p = lc.get_progress()
3343 if not p["cycle-in-progress"]:
3344 d2 = fireEventually()
3345 d2.addCallback(_after_first_bucket)
3347 d.addCallback(_after_first_bucket)
3348 d.addCallback(lambda ign: self.render1(webstatus))
3349 def _check_html_in_cycle(html):
3350 s = remove_tags(html)
3351 # the first bucket encountered gets deleted, and its prefix
3352 # happens to be about 1/5th of the way through the ring, so the
3353 # predictor thinks we'll have 5 shares and that we'll delete them
3354 # all. This part of the test depends upon the SIs landing right
3355 # where they do now.
3356 self.failUnlessIn("The remainder of this cycle is expected to "
3357 "recover: 4 shares, 4 buckets", s)
3358 self.failUnlessIn("The whole cycle is expected to examine "
3359 "5 shares in 5 buckets and to recover: "
3360 "5 shares, 5 buckets", s)
3361 d.addCallback(_check_html_in_cycle)
3363 # wait for the crawler to finish the first cycle. Two shares should
3366 return bool(lc.get_state()["last-cycle-finished"] is not None)
3367 d.addCallback(lambda ign: self.poll(_wait))
3369 def _after_first_cycle(ignored):
3370 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3371 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3372 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3373 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3374 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3375 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3378 last = s["history"][0]
3380 self.failUnlessEqual(last["expiration-enabled"], True)
3381 self.failUnlessEqual(last["configured-expiration-mode"],
3382 ("age", 2000, None, ("mutable", "immutable")))
3383 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3385 rec = last["space-recovered"]
3386 self.failUnlessEqual(rec["examined-buckets"], 4)
3387 self.failUnlessEqual(rec["examined-shares"], 4)
3388 self.failUnlessEqual(rec["actual-buckets"], 2)
3389 self.failUnlessEqual(rec["original-buckets"], 2)
3390 self.failUnlessEqual(rec["configured-buckets"], 2)
3391 self.failUnlessEqual(rec["actual-shares"], 2)
3392 self.failUnlessEqual(rec["original-shares"], 2)
3393 self.failUnlessEqual(rec["configured-shares"], 2)
3394 size = sf0_size + sf2_size
3395 self.failUnlessEqual(rec["actual-sharebytes"], size)
3396 self.failUnlessEqual(rec["original-sharebytes"], size)
3397 self.failUnlessEqual(rec["configured-sharebytes"], size)
3398 # different platforms have different notions of "blocks used by
3399 # this file", so merely assert that it's a number
3400 self.failUnless(rec["actual-diskbytes"] >= 0,
3401 rec["actual-diskbytes"])
3402 self.failUnless(rec["original-diskbytes"] >= 0,
3403 rec["original-diskbytes"])
3404 self.failUnless(rec["configured-diskbytes"] >= 0,
3405 rec["configured-diskbytes"])
3406 d.addCallback(_after_first_cycle)
3407 d.addCallback(lambda ign: self.render1(webstatus))
3408 def _check_html(html):
3409 s = remove_tags(html)
3410 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3411 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3412 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3413 d.addCallback(_check_html)
3416 def test_expire_cutoff_date(self):
3417 basedir = "storage/LeaseCrawler/expire_cutoff_date"
3418 fileutil.make_dirs(basedir)
3419 # setting cutoff-date to 2000 seconds ago means that any lease which
3420 # is more than 2000s old will be expired.
3422 then = int(now - 2000)
3423 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3424 expiration_enabled=True,
3425 expiration_mode="cutoff-date",
3426 expiration_cutoff_date=then)
3427 # make it start sooner than usual.
3428 lc = ss.lease_checker
3430 lc.stop_after_first_bucket = True
3431 webstatus = StorageStatus(ss)
3433 # create a few shares, with some leases on them
3434 self.make_shares(ss)
3435 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3437 def count_shares(si):
3438 return len(list(ss._iter_share_files(si)))
3439 def _get_sharefile(si):
3440 return list(ss._iter_share_files(si))[0]
3441 def count_leases(si):
3442 return len(list(_get_sharefile(si).get_leases()))
3444 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3445 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3446 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3447 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3448 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3449 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3450 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3451 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3453 # artificially crank back the expiration time on the first lease of
3454 # each share, to make it look like was renewed 3000s ago. To achieve
3455 # this, we need to set the expiration time to now-3000+31days. This
3456 # will change when the lease format is improved to contain both
3457 # create/renew time and duration.
3458 new_expiration_time = now - 3000 + 31*24*60*60
3460 # Some shares have an extra lease which is set to expire at the
3461 # default time in 31 days from now (age=31days). We then run the
3462 # crawler, which will expire the first lease, making some shares get
3463 # deleted and others stay alive (with one remaining lease)
3465 sf0 = _get_sharefile(immutable_si_0)
3466 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3467 sf0_size = os.stat(sf0.home).st_size
3469 # immutable_si_1 gets an extra lease
3470 sf1 = _get_sharefile(immutable_si_1)
3471 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3473 sf2 = _get_sharefile(mutable_si_2)
3474 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3475 sf2_size = os.stat(sf2.home).st_size
3477 # mutable_si_3 gets an extra lease
3478 sf3 = _get_sharefile(mutable_si_3)
3479 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3481 ss.setServiceParent(self.s)
3483 d = fireEventually()
3484 # examine the state right after the first bucket has been processed
3485 def _after_first_bucket(ignored):
3486 p = lc.get_progress()
3487 if not p["cycle-in-progress"]:
3488 d2 = fireEventually()
3489 d2.addCallback(_after_first_bucket)
3491 d.addCallback(_after_first_bucket)
3492 d.addCallback(lambda ign: self.render1(webstatus))
3493 def _check_html_in_cycle(html):
3494 s = remove_tags(html)
3495 # the first bucket encountered gets deleted, and its prefix
3496 # happens to be about 1/5th of the way through the ring, so the
3497 # predictor thinks we'll have 5 shares and that we'll delete them
3498 # all. This part of the test depends upon the SIs landing right
3499 # where they do now.
3500 self.failUnlessIn("The remainder of this cycle is expected to "
3501 "recover: 4 shares, 4 buckets", s)
3502 self.failUnlessIn("The whole cycle is expected to examine "
3503 "5 shares in 5 buckets and to recover: "
3504 "5 shares, 5 buckets", s)
3505 d.addCallback(_check_html_in_cycle)
3507 # wait for the crawler to finish the first cycle. Two shares should
3510 return bool(lc.get_state()["last-cycle-finished"] is not None)
3511 d.addCallback(lambda ign: self.poll(_wait))
3513 def _after_first_cycle(ignored):
3514 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3515 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3516 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3517 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3518 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3519 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3522 last = s["history"][0]
3524 self.failUnlessEqual(last["expiration-enabled"], True)
3525 self.failUnlessEqual(last["configured-expiration-mode"],
3526 ("cutoff-date", None, then,
3527 ("mutable", "immutable")))
3528 self.failUnlessEqual(last["leases-per-share-histogram"],
3531 rec = last["space-recovered"]
3532 self.failUnlessEqual(rec["examined-buckets"], 4)
3533 self.failUnlessEqual(rec["examined-shares"], 4)
3534 self.failUnlessEqual(rec["actual-buckets"], 2)
3535 self.failUnlessEqual(rec["original-buckets"], 0)
3536 self.failUnlessEqual(rec["configured-buckets"], 2)
3537 self.failUnlessEqual(rec["actual-shares"], 2)
3538 self.failUnlessEqual(rec["original-shares"], 0)
3539 self.failUnlessEqual(rec["configured-shares"], 2)
3540 size = sf0_size + sf2_size
3541 self.failUnlessEqual(rec["actual-sharebytes"], size)
3542 self.failUnlessEqual(rec["original-sharebytes"], 0)
3543 self.failUnlessEqual(rec["configured-sharebytes"], size)
3544 # different platforms have different notions of "blocks used by
3545 # this file", so merely assert that it's a number
3546 self.failUnless(rec["actual-diskbytes"] >= 0,
3547 rec["actual-diskbytes"])
3548 self.failUnless(rec["original-diskbytes"] >= 0,
3549 rec["original-diskbytes"])
3550 self.failUnless(rec["configured-diskbytes"] >= 0,
3551 rec["configured-diskbytes"])
3552 d.addCallback(_after_first_cycle)
3553 d.addCallback(lambda ign: self.render1(webstatus))
3554 def _check_html(html):
3555 s = remove_tags(html)
3556 self.failUnlessIn("Expiration Enabled:"
3557 " expired leases will be removed", s)
3558 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3559 substr = "Leases created or last renewed before %s will be considered expired." % date
3560 self.failUnlessIn(substr, s)
3561 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3562 d.addCallback(_check_html)
3565 def test_only_immutable(self):
3566 basedir = "storage/LeaseCrawler/only_immutable"
3567 fileutil.make_dirs(basedir)
3569 then = int(now - 2000)
3570 ss = StorageServer(basedir, "\x00" * 20,
3571 expiration_enabled=True,
3572 expiration_mode="cutoff-date",
3573 expiration_cutoff_date=then,
3574 expiration_sharetypes=("immutable",))
3575 lc = ss.lease_checker
3577 webstatus = StorageStatus(ss)
3579 self.make_shares(ss)
3580 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3581 # set all leases to be expirable
3582 new_expiration_time = now - 3000 + 31*24*60*60
3584 def count_shares(si):
3585 return len(list(ss._iter_share_files(si)))
3586 def _get_sharefile(si):
3587 return list(ss._iter_share_files(si))[0]
3588 def count_leases(si):
3589 return len(list(_get_sharefile(si).get_leases()))
3591 sf0 = _get_sharefile(immutable_si_0)
3592 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3593 sf1 = _get_sharefile(immutable_si_1)
3594 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3595 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3596 sf2 = _get_sharefile(mutable_si_2)
3597 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3598 sf3 = _get_sharefile(mutable_si_3)
3599 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3600 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3602 ss.setServiceParent(self.s)
3604 return bool(lc.get_state()["last-cycle-finished"] is not None)
3605 d = self.poll(_wait)
3607 def _after_first_cycle(ignored):
3608 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3609 self.failUnlessEqual(count_shares(immutable_si_1), 0)
3610 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3611 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3612 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3613 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3614 d.addCallback(_after_first_cycle)
3615 d.addCallback(lambda ign: self.render1(webstatus))
3616 def _check_html(html):
3617 s = remove_tags(html)
3618 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3619 d.addCallback(_check_html)
3622 def test_only_mutable(self):
3623 basedir = "storage/LeaseCrawler/only_mutable"
3624 fileutil.make_dirs(basedir)
3626 then = int(now - 2000)
3627 ss = StorageServer(basedir, "\x00" * 20,
3628 expiration_enabled=True,
3629 expiration_mode="cutoff-date",
3630 expiration_cutoff_date=then,
3631 expiration_sharetypes=("mutable",))
3632 lc = ss.lease_checker
3634 webstatus = StorageStatus(ss)
3636 self.make_shares(ss)
3637 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3638 # set all leases to be expirable
3639 new_expiration_time = now - 3000 + 31*24*60*60
3641 def count_shares(si):
3642 return len(list(ss._iter_share_files(si)))
3643 def _get_sharefile(si):
3644 return list(ss._iter_share_files(si))[0]
3645 def count_leases(si):
3646 return len(list(_get_sharefile(si).get_leases()))
3648 sf0 = _get_sharefile(immutable_si_0)
3649 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3650 sf1 = _get_sharefile(immutable_si_1)
3651 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3652 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3653 sf2 = _get_sharefile(mutable_si_2)
3654 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3655 sf3 = _get_sharefile(mutable_si_3)
3656 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3657 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3659 ss.setServiceParent(self.s)
3661 return bool(lc.get_state()["last-cycle-finished"] is not None)
3662 d = self.poll(_wait)
3664 def _after_first_cycle(ignored):
3665 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3666 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3667 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3668 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3669 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3670 self.failUnlessEqual(count_shares(mutable_si_3), 0)
3671 d.addCallback(_after_first_cycle)
3672 d.addCallback(lambda ign: self.render1(webstatus))
3673 def _check_html(html):
3674 s = remove_tags(html)
3675 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3676 d.addCallback(_check_html)
3679 def test_bad_mode(self):
3680 basedir = "storage/LeaseCrawler/bad_mode"
3681 fileutil.make_dirs(basedir)
3682 e = self.failUnlessRaises(ValueError,
3683 StorageServer, basedir, "\x00" * 20,
3684 expiration_mode="bogus")
3685 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3687 def test_parse_duration(self):
3691 p = time_format.parse_duration
3692 self.failUnlessEqual(p("7days"), 7*DAY)
3693 self.failUnlessEqual(p("31day"), 31*DAY)
3694 self.failUnlessEqual(p("60 days"), 60*DAY)
3695 self.failUnlessEqual(p("2mo"), 2*MONTH)
3696 self.failUnlessEqual(p("3 month"), 3*MONTH)
3697 self.failUnlessEqual(p("2years"), 2*YEAR)
3698 e = self.failUnlessRaises(ValueError, p, "2kumquats")
3699 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3701 def test_parse_date(self):
3702 p = time_format.parse_date
3703 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3704 self.failUnlessEqual(p("2009-03-18"), 1237334400)
3706 def test_limited_history(self):
3707 basedir = "storage/LeaseCrawler/limited_history"
3708 fileutil.make_dirs(basedir)
3709 ss = StorageServer(basedir, "\x00" * 20)
3710 # make it start sooner than usual.
3711 lc = ss.lease_checker
3715 # create a few shares, with some leases on them
3716 self.make_shares(ss)
3718 ss.setServiceParent(self.s)
3720 def _wait_until_15_cycles_done():
3721 last = lc.state["last-cycle-finished"]
3722 if last is not None and last >= 15:
3727 d = self.poll(_wait_until_15_cycles_done)
3729 def _check(ignored):
3732 self.failUnlessEqual(len(h), 10)
3733 self.failUnlessEqual(max(h.keys()), 15)
3734 self.failUnlessEqual(min(h.keys()), 6)
3735 d.addCallback(_check)
3738 def test_unpredictable_future(self):
3739 basedir = "storage/LeaseCrawler/unpredictable_future"
3740 fileutil.make_dirs(basedir)
3741 ss = StorageServer(basedir, "\x00" * 20)
3742 # make it start sooner than usual.
3743 lc = ss.lease_checker
3745 lc.cpu_slice = -1.0 # stop quickly
3747 self.make_shares(ss)
3749 ss.setServiceParent(self.s)
3751 d = fireEventually()
3752 def _check(ignored):
3753 # this should fire after the first bucket is complete, but before
3754 # the first prefix is complete, so the progress-measurer won't
3755 # think we've gotten far enough to raise our percent-complete
3756 # above 0%, triggering the cannot-predict-the-future code in
3757 # expirer.py . This will have to change if/when the
3758 # progress-measurer gets smart enough to count buckets (we'll
3759 # have to interrupt it even earlier, before it's finished the
3762 if "cycle-to-date" not in s:
3763 d2 = fireEventually()
3764 d2.addCallback(_check)
3766 self.failUnlessIn("cycle-to-date", s)
3767 self.failUnlessIn("estimated-remaining-cycle", s)
3768 self.failUnlessIn("estimated-current-cycle", s)
3770 left = s["estimated-remaining-cycle"]["space-recovered"]
3771 self.failUnlessEqual(left["actual-buckets"], None)
3772 self.failUnlessEqual(left["original-buckets"], None)
3773 self.failUnlessEqual(left["configured-buckets"], None)
3774 self.failUnlessEqual(left["actual-shares"], None)
3775 self.failUnlessEqual(left["original-shares"], None)
3776 self.failUnlessEqual(left["configured-shares"], None)
3777 self.failUnlessEqual(left["actual-diskbytes"], None)
3778 self.failUnlessEqual(left["original-diskbytes"], None)
3779 self.failUnlessEqual(left["configured-diskbytes"], None)
3780 self.failUnlessEqual(left["actual-sharebytes"], None)
3781 self.failUnlessEqual(left["original-sharebytes"], None)
3782 self.failUnlessEqual(left["configured-sharebytes"], None)
3784 full = s["estimated-remaining-cycle"]["space-recovered"]
3785 self.failUnlessEqual(full["actual-buckets"], None)
3786 self.failUnlessEqual(full["original-buckets"], None)
3787 self.failUnlessEqual(full["configured-buckets"], None)
3788 self.failUnlessEqual(full["actual-shares"], None)
3789 self.failUnlessEqual(full["original-shares"], None)
3790 self.failUnlessEqual(full["configured-shares"], None)
3791 self.failUnlessEqual(full["actual-diskbytes"], None)
3792 self.failUnlessEqual(full["original-diskbytes"], None)
3793 self.failUnlessEqual(full["configured-diskbytes"], None)
3794 self.failUnlessEqual(full["actual-sharebytes"], None)
3795 self.failUnlessEqual(full["original-sharebytes"], None)
3796 self.failUnlessEqual(full["configured-sharebytes"], None)
3798 d.addCallback(_check)
3801 def test_no_st_blocks(self):
3802 basedir = "storage/LeaseCrawler/no_st_blocks"
3803 fileutil.make_dirs(basedir)
3804 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3805 expiration_mode="age",
3806 expiration_override_lease_duration=-1000)
3807 # a negative expiration_time= means the "configured-"
3808 # space-recovered counts will be non-zero, since all shares will have
3811 # make it start sooner than usual.
3812 lc = ss.lease_checker
3815 self.make_shares(ss)
3816 ss.setServiceParent(self.s)
3818 return bool(lc.get_state()["last-cycle-finished"] is not None)
3819 d = self.poll(_wait)
3821 def _check(ignored):
3823 last = s["history"][0]
3824 rec = last["space-recovered"]
3825 self.failUnlessEqual(rec["configured-buckets"], 4)
3826 self.failUnlessEqual(rec["configured-shares"], 4)
3827 self.failUnless(rec["configured-sharebytes"] > 0,
3828 rec["configured-sharebytes"])
3829 # without the .st_blocks field in os.stat() results, we should be
3830 # reporting diskbytes==sharebytes
3831 self.failUnlessEqual(rec["configured-sharebytes"],
3832 rec["configured-diskbytes"])
3833 d.addCallback(_check)
3836 def test_share_corruption(self):
3837 self._poll_should_ignore_these_errors = [
3838 UnknownMutableContainerVersionError,
3839 UnknownImmutableContainerVersionError,
3841 basedir = "storage/LeaseCrawler/share_corruption"
3842 fileutil.make_dirs(basedir)
3843 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3844 w = StorageStatus(ss)
3845 # make it start sooner than usual.
3846 lc = ss.lease_checker
3847 lc.stop_after_first_bucket = True
3851 # create a few shares, with some leases on them
3852 self.make_shares(ss)
3854 # now corrupt one, and make sure the lease-checker keeps going
3855 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3856 first = min(self.sis)
3857 first_b32 = base32.b2a(first)
3858 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3861 f.write("BAD MAGIC")
3863 # if get_share_file() doesn't see the correct mutable magic, it
3864 # assumes the file is an immutable share, and then
3865 # immutable.ShareFile sees a bad version. So regardless of which kind
3866 # of share we corrupted, this will trigger an
3867 # UnknownImmutableContainerVersionError.
3869 # also create an empty bucket
3870 empty_si = base32.b2a("\x04"*16)
3871 empty_bucket_dir = os.path.join(ss.sharedir,
3872 storage_index_to_dir(empty_si))
3873 fileutil.make_dirs(empty_bucket_dir)
3875 ss.setServiceParent(self.s)
3877 d = fireEventually()
3879 # now examine the state right after the first bucket has been
3881 def _after_first_bucket(ignored):
3883 if "cycle-to-date" not in s:
3884 d2 = fireEventually()
3885 d2.addCallback(_after_first_bucket)
3887 so_far = s["cycle-to-date"]
3888 rec = so_far["space-recovered"]
3889 self.failUnlessEqual(rec["examined-buckets"], 1)
3890 self.failUnlessEqual(rec["examined-shares"], 0)
3891 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3892 d.addCallback(_after_first_bucket)
3894 d.addCallback(lambda ign: self.render_json(w))
3895 def _check_json(json):
3896 data = simplejson.loads(json)
3897 # grr. json turns all dict keys into strings.
3898 so_far = data["lease-checker"]["cycle-to-date"]
3899 corrupt_shares = so_far["corrupt-shares"]
3900 # it also turns all tuples into lists
3901 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3902 d.addCallback(_check_json)
3903 d.addCallback(lambda ign: self.render1(w))
3904 def _check_html(html):
3905 s = remove_tags(html)
3906 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3907 d.addCallback(_check_html)
3910 return bool(lc.get_state()["last-cycle-finished"] is not None)
3911 d.addCallback(lambda ign: self.poll(_wait))
3913 def _after_first_cycle(ignored):
3915 last = s["history"][0]
3916 rec = last["space-recovered"]
3917 self.failUnlessEqual(rec["examined-buckets"], 5)
3918 self.failUnlessEqual(rec["examined-shares"], 3)
3919 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3920 d.addCallback(_after_first_cycle)
3921 d.addCallback(lambda ign: self.render_json(w))
3922 def _check_json_history(json):
3923 data = simplejson.loads(json)
3924 last = data["lease-checker"]["history"]["0"]
3925 corrupt_shares = last["corrupt-shares"]
3926 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3927 d.addCallback(_check_json_history)
3928 d.addCallback(lambda ign: self.render1(w))
3929 def _check_html_history(html):
3930 s = remove_tags(html)
3931 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3932 d.addCallback(_check_html_history)
3935 self.flushLoggedErrors(UnknownMutableContainerVersionError,
3936 UnknownImmutableContainerVersionError)
3941 def render_json(self, page):
3942 d = self.render1(page, args={"t": ["json"]})
3945 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3948 self.s = service.MultiService()
3949 self.s.startService()
3951 return self.s.stopService()
3953 def test_no_server(self):
3954 w = StorageStatus(None)
3955 html = w.renderSynchronously()
3956 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3958 def test_status(self):
3959 basedir = "storage/WebStatus/status"
3960 fileutil.make_dirs(basedir)
3961 ss = StorageServer(basedir, "\x00" * 20)
3962 ss.setServiceParent(self.s)
3963 w = StorageStatus(ss)
3965 def _check_html(html):
3966 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3967 s = remove_tags(html)
3968 self.failUnlessIn("Accepting new shares: Yes", s)
3969 self.failUnlessIn("Reserved space: - 0 B (0)", s)
3970 d.addCallback(_check_html)
3971 d.addCallback(lambda ign: self.render_json(w))
3972 def _check_json(json):
3973 data = simplejson.loads(json)
3975 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3976 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3977 self.failUnlessIn("bucket-counter", data)
3978 self.failUnlessIn("lease-checker", data)
3979 d.addCallback(_check_json)
3982 def render_json(self, page):
3983 d = self.render1(page, args={"t": ["json"]})
3986 @mock.patch('allmydata.util.fileutil.get_disk_stats')
3987 def test_status_no_disk_stats(self, mock_get_disk_stats):
3988 mock_get_disk_stats.side_effect = AttributeError()
3990 # Some platforms may have no disk stats API. Make sure the code can handle that
3991 # (test runs on all platforms).
3992 basedir = "storage/WebStatus/status_no_disk_stats"
3993 fileutil.make_dirs(basedir)
3994 ss = StorageServer(basedir, "\x00" * 20)
3995 ss.setServiceParent(self.s)
3996 w = StorageStatus(ss)
3997 html = w.renderSynchronously()
3998 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3999 s = remove_tags(html)
4000 self.failUnlessIn("Accepting new shares: Yes", s)
4001 self.failUnlessIn("Total disk space: ?", s)
4002 self.failUnlessIn("Space Available to Tahoe: ?", s)
4003 self.failUnless(ss.get_available_space() is None)
4005 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4006 def test_status_bad_disk_stats(self, mock_get_disk_stats):
4007 mock_get_disk_stats.side_effect = OSError()
4009 # If the API to get disk stats exists but a call to it fails, then the status should
4010 # show that no shares will be accepted, and get_available_space() should be 0.
4011 basedir = "storage/WebStatus/status_bad_disk_stats"
4012 fileutil.make_dirs(basedir)
4013 ss = StorageServer(basedir, "\x00" * 20)
4014 ss.setServiceParent(self.s)
4015 w = StorageStatus(ss)
4016 html = w.renderSynchronously()
4017 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4018 s = remove_tags(html)
4019 self.failUnlessIn("Accepting new shares: No", s)
4020 self.failUnlessIn("Total disk space: ?", s)
4021 self.failUnlessIn("Space Available to Tahoe: ?", s)
4022 self.failUnlessEqual(ss.get_available_space(), 0)
4024 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4025 def test_status_right_disk_stats(self, mock_get_disk_stats):
4028 free_for_root = 4*GB
4029 free_for_nonroot = 3*GB
4030 reserved_space = 1*GB
4031 used = total - free_for_root
4032 avail = max(free_for_nonroot - reserved_space, 0)
4033 mock_get_disk_stats.return_value = {
4035 'free_for_root': free_for_root,
4036 'free_for_nonroot': free_for_nonroot,
4041 basedir = "storage/WebStatus/status_right_disk_stats"
4042 fileutil.make_dirs(basedir)
4043 ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4044 expecteddir = ss.sharedir
4045 ss.setServiceParent(self.s)
4046 w = StorageStatus(ss)
4047 html = w.renderSynchronously()
4049 self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4050 mock_get_disk_stats.call_args_list)
4052 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4053 s = remove_tags(html)
4054 self.failUnlessIn("Total disk space: 5.00 GB", s)
4055 self.failUnlessIn("Disk space used: - 1.00 GB", s)
4056 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4057 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4058 self.failUnlessIn("Reserved space: - 1.00 GB", s)
4059 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4060 self.failUnlessEqual(ss.get_available_space(), 2*GB)
4062 def test_readonly(self):
4063 basedir = "storage/WebStatus/readonly"
4064 fileutil.make_dirs(basedir)
4065 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4066 ss.setServiceParent(self.s)
4067 w = StorageStatus(ss)
4068 html = w.renderSynchronously()
4069 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4070 s = remove_tags(html)
4071 self.failUnlessIn("Accepting new shares: No", s)
4073 def test_reserved(self):
4074 basedir = "storage/WebStatus/reserved"
4075 fileutil.make_dirs(basedir)
4076 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4077 ss.setServiceParent(self.s)
4078 w = StorageStatus(ss)
4079 html = w.renderSynchronously()
4080 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4081 s = remove_tags(html)
4082 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4084 def test_huge_reserved(self):
4085 basedir = "storage/WebStatus/reserved"
4086 fileutil.make_dirs(basedir)
4087 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4088 ss.setServiceParent(self.s)
4089 w = StorageStatus(ss)
4090 html = w.renderSynchronously()
4091 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4092 s = remove_tags(html)
4093 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4095 def test_util(self):
4096 w = StorageStatus(None)
4097 self.failUnlessEqual(w.render_space(None, None), "?")
4098 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4099 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4100 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4101 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4102 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)