2 import time, os.path, platform, stat, re, simplejson, struct, shutil
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
23 LayoutInvalid, MDMFSIGNABLEHEADER, \
24 SIGNED_PREFIX, MDMFHEADER, \
25 MDMFOFFSETS, SDMFSlotWriteProxy, \
28 VERIFICATION_KEY_SIZE, \
30 from allmydata.interfaces import BadWriteEnablerError
31 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
32 from allmydata.test.common_web import WebRenderingMixin
33 from allmydata.test.no_network import NoNetworkServer
34 from allmydata.web.storage import StorageStatus, remove_prefix
39 def __init__(self, ignore_disconnectors=False):
40 self.ignore = ignore_disconnectors
41 self.disconnectors = {}
42 def notifyOnDisconnect(self, f, *args, **kwargs):
46 self.disconnectors[m] = (f, args, kwargs)
48 def dontNotifyOnDisconnect(self, marker):
51 del self.disconnectors[marker]
53 class FakeStatsProvider:
54 def count(self, name, delta=1):
56 def register_producer(self, producer):
59 class Bucket(unittest.TestCase):
60 def make_workdir(self, name):
61 basedir = os.path.join("storage", "Bucket", name)
62 incoming = os.path.join(basedir, "tmp", "bucket")
63 final = os.path.join(basedir, "bucket")
64 fileutil.make_dirs(basedir)
65 fileutil.make_dirs(os.path.join(basedir, "tmp"))
66 return incoming, final
68 def bucket_writer_closed(self, bw, consumed):
70 def add_latency(self, category, latency):
72 def count(self, name, delta=1):
77 renew_secret = os.urandom(32)
78 cancel_secret = os.urandom(32)
79 expiration_time = time.time() + 5000
80 return LeaseInfo(owner_num, renew_secret, cancel_secret,
81 expiration_time, "\x00" * 20)
83 def test_create(self):
84 incoming, final = self.make_workdir("test_create")
85 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87 bw.remote_write(0, "a"*25)
88 bw.remote_write(25, "b"*25)
89 bw.remote_write(50, "c"*25)
90 bw.remote_write(75, "d"*7)
93 def test_readwrite(self):
94 incoming, final = self.make_workdir("test_readwrite")
95 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
97 bw.remote_write(0, "a"*25)
98 bw.remote_write(25, "b"*25)
99 bw.remote_write(50, "c"*7) # last block may be short
103 br = BucketReader(self, bw.finalhome)
104 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
105 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
106 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
108 def test_read_past_end_of_share_data(self):
109 # test vector for immutable files (hard-coded contents of an immutable share
112 # The following immutable share file content is identical to that
113 # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
114 # with share data == 'a'. The total size of this content is 85
117 containerdata = struct.pack('>LLL', 1, 1, 1)
119 # A Tahoe-LAFS storage client would send as the share_data a
120 # complicated string involving hash trees and a URI Extension Block
121 # -- see allmydata/immutable/layout.py . This test, which is
122 # simulating a client, just sends 'a'.
125 ownernumber = struct.pack('>L', 0)
126 renewsecret = 'THIS LETS ME RENEW YOUR FILE....'
127 assert len(renewsecret) == 32
128 cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
129 assert len(cancelsecret) == 32
130 expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
132 lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
134 share_file_data = containerdata + share_data + lease_data
136 incoming, final = self.make_workdir("test_read_past_end_of_share_data")
138 fileutil.write(final, share_file_data)
140 class MockStorageServer(object):
141 def add_latency(self, category, latency):
143 def count(self, name, delta=1):
146 mockstorageserver = MockStorageServer()
149 br = BucketReader(mockstorageserver, final)
151 self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
153 # Read past the end of share data to get the cancel secret.
154 read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
156 result_of_read = br.remote_read(0, read_length)
157 self.failUnlessEqual(result_of_read, share_data)
159 result_of_read = br.remote_read(0, len(share_data)+1)
160 self.failUnlessEqual(result_of_read, share_data)
168 def callRemote(self, methname, *args, **kwargs):
170 meth = getattr(self.target, "remote_" + methname)
171 return meth(*args, **kwargs)
173 if methname == "slot_readv":
175 if "writev" in methname:
176 self.write_count += 1
178 return defer.maybeDeferred(_call)
181 class BucketProxy(unittest.TestCase):
182 def make_bucket(self, name, size):
183 basedir = os.path.join("storage", "BucketProxy", name)
184 incoming = os.path.join(basedir, "tmp", "bucket")
185 final = os.path.join(basedir, "bucket")
186 fileutil.make_dirs(basedir)
187 fileutil.make_dirs(os.path.join(basedir, "tmp"))
188 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
194 def make_lease(self):
196 renew_secret = os.urandom(32)
197 cancel_secret = os.urandom(32)
198 expiration_time = time.time() + 5000
199 return LeaseInfo(owner_num, renew_secret, cancel_secret,
200 expiration_time, "\x00" * 20)
202 def bucket_writer_closed(self, bw, consumed):
204 def add_latency(self, category, latency):
206 def count(self, name, delta=1):
209 def test_create(self):
210 bw, rb, sharefname = self.make_bucket("test_create", 500)
211 bp = WriteBucketProxy(rb, None,
216 uri_extension_size_max=500)
217 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
219 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
220 # Let's pretend each share has 100 bytes of data, and that there are
221 # 4 segments (25 bytes each), and 8 shares total. So the two
222 # per-segment merkle trees (crypttext_hash_tree,
223 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
224 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
225 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
226 # long. That should make the whole share:
228 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
229 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
231 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
233 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
235 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
237 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
239 uri_extension = "s" + "E"*498 + "e"
241 bw, rb, sharefname = self.make_bucket(name, sharesize)
242 bp = wbp_class(rb, None,
247 uri_extension_size_max=len(uri_extension))
250 d.addCallback(lambda res: bp.put_block(0, "a"*25))
251 d.addCallback(lambda res: bp.put_block(1, "b"*25))
252 d.addCallback(lambda res: bp.put_block(2, "c"*25))
253 d.addCallback(lambda res: bp.put_block(3, "d"*20))
254 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
255 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
256 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
257 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
258 d.addCallback(lambda res: bp.close())
260 # now read everything back
261 def _start_reading(res):
262 br = BucketReader(self, sharefname)
265 server = NoNetworkServer("abc", None)
266 rbp = rbp_class(rb, server, storage_index="")
267 self.failUnlessIn("to peer", repr(rbp))
268 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
270 d1 = rbp.get_block_data(0, 25, 25)
271 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
272 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
273 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
274 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
275 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
276 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
277 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
279 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
280 d1.addCallback(lambda res:
281 self.failUnlessEqual(res, crypttext_hashes))
282 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
283 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
284 d1.addCallback(lambda res: rbp.get_share_hashes())
285 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
286 d1.addCallback(lambda res: rbp.get_uri_extension())
287 d1.addCallback(lambda res:
288 self.failUnlessEqual(res, uri_extension))
292 d.addCallback(_start_reading)
296 def test_readwrite_v1(self):
297 return self._do_test_readwrite("test_readwrite_v1",
298 0x24, WriteBucketProxy, ReadBucketProxy)
300 def test_readwrite_v2(self):
301 return self._do_test_readwrite("test_readwrite_v2",
302 0x44, WriteBucketProxy_v2, ReadBucketProxy)
304 class Server(unittest.TestCase):
307 self.sparent = LoggingServiceParent()
308 self.sparent.startService()
309 self._lease_secret = itertools.count()
311 return self.sparent.stopService()
313 def workdir(self, name):
314 basedir = os.path.join("storage", "Server", name)
317 def create(self, name, reserved_space=0, klass=StorageServer):
318 workdir = self.workdir(name)
319 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
320 stats_provider=FakeStatsProvider())
321 ss.setServiceParent(self.sparent)
324 def test_create(self):
325 self.create("test_create")
327 def test_declares_fixed_1528(self):
328 ss = self.create("test_declares_fixed_1528")
329 ver = ss.remote_get_version()
330 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
331 self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
333 def test_declares_maximum_share_sizes(self):
334 ss = self.create("test_declares_maximum_share_sizes")
335 ver = ss.remote_get_version()
336 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
337 self.failUnlessIn('maximum-immutable-share-size', sv1)
338 self.failUnlessIn('maximum-mutable-share-size', sv1)
340 def test_declares_available_space(self):
341 ss = self.create("test_declares_available_space")
342 ver = ss.remote_get_version()
343 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
344 self.failUnlessIn('available-space', sv1)
346 def allocate(self, ss, storage_index, sharenums, size, canary=None):
347 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
348 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
350 canary = FakeCanary()
351 return ss.remote_allocate_buckets(storage_index,
352 renew_secret, cancel_secret,
353 sharenums, size, canary)
355 def test_large_share(self):
356 syslow = platform.system().lower()
357 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
358 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
360 avail = fileutil.get_available_space('.', 512*2**20)
362 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
364 ss = self.create("test_large_share")
366 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
367 self.failUnlessEqual(already, set())
368 self.failUnlessEqual(set(writers.keys()), set([0]))
370 shnum, bucket = writers.items()[0]
371 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
372 bucket.remote_write(2**32, "ab")
373 bucket.remote_close()
375 readers = ss.remote_get_buckets("allocate")
376 reader = readers[shnum]
377 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
379 def test_dont_overfill_dirs(self):
381 This test asserts that if you add a second share whose storage index
382 share lots of leading bits with an extant share (but isn't the exact
383 same storage index), this won't add an entry to the share directory.
385 ss = self.create("test_dont_overfill_dirs")
386 already, writers = self.allocate(ss, "storageindex", [0], 10)
387 for i, wb in writers.items():
388 wb.remote_write(0, "%10d" % i)
390 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
392 children_of_storedir = set(os.listdir(storedir))
394 # Now store another one under another storageindex that has leading
395 # chars the same as the first storageindex.
396 already, writers = self.allocate(ss, "storageindey", [0], 10)
397 for i, wb in writers.items():
398 wb.remote_write(0, "%10d" % i)
400 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
402 new_children_of_storedir = set(os.listdir(storedir))
403 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
405 def test_remove_incoming(self):
406 ss = self.create("test_remove_incoming")
407 already, writers = self.allocate(ss, "vid", range(3), 10)
408 for i,wb in writers.items():
409 wb.remote_write(0, "%10d" % i)
411 incoming_share_dir = wb.incominghome
412 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
413 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
414 incoming_dir = os.path.dirname(incoming_prefix_dir)
415 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
416 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
417 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
419 def test_abort(self):
420 # remote_abort, when called on a writer, should make sure that
421 # the allocated size of the bucket is not counted by the storage
422 # server when accounting for space.
423 ss = self.create("test_abort")
424 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
425 self.failIfEqual(ss.allocated_size(), 0)
427 # Now abort the writers.
428 for writer in writers.itervalues():
429 writer.remote_abort()
430 self.failUnlessEqual(ss.allocated_size(), 0)
433 def test_allocate(self):
434 ss = self.create("test_allocate")
436 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
438 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
439 self.failUnlessEqual(already, set())
440 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
442 # while the buckets are open, they should not count as readable
443 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
446 for i,wb in writers.items():
447 wb.remote_write(0, "%25d" % i)
449 # aborting a bucket that was already closed is a no-op
452 # now they should be readable
453 b = ss.remote_get_buckets("allocate")
454 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
455 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
457 self.failUnlessIn("BucketReader", b_str)
458 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
460 # now if we ask about writing again, the server should offer those
461 # three buckets as already present. It should offer them even if we
462 # don't ask about those specific ones.
463 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
464 self.failUnlessEqual(already, set([0,1,2]))
465 self.failUnlessEqual(set(writers.keys()), set([3,4]))
467 # while those two buckets are open for writing, the server should
468 # refuse to offer them to uploaders
470 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
471 self.failUnlessEqual(already2, set([0,1,2]))
472 self.failUnlessEqual(set(writers2.keys()), set([5]))
474 # aborting the writes should remove the tempfiles
475 for i,wb in writers2.items():
477 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
478 self.failUnlessEqual(already2, set([0,1,2]))
479 self.failUnlessEqual(set(writers2.keys()), set([5]))
481 for i,wb in writers2.items():
483 for i,wb in writers.items():
486 def test_bad_container_version(self):
487 ss = self.create("test_bad_container_version")
488 a,w = self.allocate(ss, "si1", [0], 10)
489 w[0].remote_write(0, "\xff"*10)
492 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
495 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
498 ss.remote_get_buckets("allocate")
500 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
501 ss.remote_get_buckets, "si1")
502 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
504 def test_disconnect(self):
505 # simulate a disconnection
506 ss = self.create("test_disconnect")
507 canary = FakeCanary()
508 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
509 self.failUnlessEqual(already, set())
510 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
511 for (f,args,kwargs) in canary.disconnectors.values():
516 # that ought to delete the incoming shares
517 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
518 self.failUnlessEqual(already, set())
519 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
521 def test_reserved_space(self):
525 def call_get_disk_stats(whichdir, reserved_space=0):
526 self.failUnlessEqual(reserved_space, reserved)
528 'free_for_nonroot': 15000 - allocated,
529 'avail': max(15000 - allocated - reserved_space, 0),
531 self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
533 ss = self.create("test_reserved_space", reserved_space=reserved)
534 # 15k available, 10k reserved, leaves 5k for shares
536 # a newly created and filled share incurs this much overhead, beyond
537 # the size we request.
539 LEASE_SIZE = 4+32+32+4
540 canary = FakeCanary(True)
541 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
542 self.failUnlessEqual(len(writers), 3)
543 # now the StorageServer should have 3000 bytes provisionally
544 # allocated, allowing only 2000 more to be claimed
545 self.failUnlessEqual(len(ss._active_writers), 3)
547 # allocating 1001-byte shares only leaves room for one
548 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
549 self.failUnlessEqual(len(writers2), 1)
550 self.failUnlessEqual(len(ss._active_writers), 4)
552 # we abandon the first set, so their provisional allocation should be
556 self.failUnlessEqual(len(ss._active_writers), 1)
557 # now we have a provisional allocation of 1001 bytes
559 # and we close the second set, so their provisional allocation should
560 # become real, long-term allocation, and grows to include the
562 for bw in writers2.values():
563 bw.remote_write(0, "a"*25)
568 self.failUnlessEqual(len(ss._active_writers), 0)
570 # this also changes the amount reported as available by call_get_disk_stats
571 allocated = 1001 + OVERHEAD + LEASE_SIZE
573 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
574 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
575 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
576 self.failUnlessEqual(len(writers3), 39)
577 self.failUnlessEqual(len(ss._active_writers), 39)
581 self.failUnlessEqual(len(ss._active_writers), 0)
582 ss.disownServiceParent()
586 basedir = self.workdir("test_seek_behavior")
587 fileutil.make_dirs(basedir)
588 filename = os.path.join(basedir, "testfile")
589 f = open(filename, "wb")
592 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
593 # files. mode="a" preserves previous contents but does not allow
594 # seeking-to-create-holes. mode="r+" allows both.
595 f = open(filename, "rb+")
599 filelen = os.stat(filename)[stat.ST_SIZE]
600 self.failUnlessEqual(filelen, 100+3)
601 f2 = open(filename, "rb")
602 self.failUnlessEqual(f2.read(5), "start")
605 def test_leases(self):
606 ss = self.create("test_leases")
607 canary = FakeCanary()
611 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
612 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
613 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
614 sharenums, size, canary)
615 self.failUnlessEqual(len(already), 0)
616 self.failUnlessEqual(len(writers), 5)
617 for wb in writers.values():
620 leases = list(ss.get_leases("si0"))
621 self.failUnlessEqual(len(leases), 1)
622 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
624 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
625 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
626 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
627 sharenums, size, canary)
628 for wb in writers.values():
631 # take out a second lease on si1
632 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
633 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
634 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
635 sharenums, size, canary)
636 self.failUnlessEqual(len(already), 5)
637 self.failUnlessEqual(len(writers), 0)
639 leases = list(ss.get_leases("si1"))
640 self.failUnlessEqual(len(leases), 2)
641 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
643 # and a third lease, using add-lease
644 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
645 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
646 ss.remote_add_lease("si1", rs2a, cs2a)
647 leases = list(ss.get_leases("si1"))
648 self.failUnlessEqual(len(leases), 3)
649 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
651 # add-lease on a missing storage index is silently ignored
652 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
654 # check that si0 is readable
655 readers = ss.remote_get_buckets("si0")
656 self.failUnlessEqual(len(readers), 5)
658 # renew the first lease. Only the proper renew_secret should work
659 ss.remote_renew_lease("si0", rs0)
660 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
661 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
663 # check that si0 is still readable
664 readers = ss.remote_get_buckets("si0")
665 self.failUnlessEqual(len(readers), 5)
667 # There is no such method as remote_cancel_lease for now -- see
669 self.failIf(hasattr(ss, 'remote_cancel_lease'), \
670 "ss should not have a 'remote_cancel_lease' method/attribute")
672 # test overlapping uploads
673 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
674 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
675 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
676 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
677 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
678 sharenums, size, canary)
679 self.failUnlessEqual(len(already), 0)
680 self.failUnlessEqual(len(writers), 5)
681 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
682 sharenums, size, canary)
683 self.failUnlessEqual(len(already2), 0)
684 self.failUnlessEqual(len(writers2), 0)
685 for wb in writers.values():
688 leases = list(ss.get_leases("si3"))
689 self.failUnlessEqual(len(leases), 1)
691 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
692 sharenums, size, canary)
693 self.failUnlessEqual(len(already3), 5)
694 self.failUnlessEqual(len(writers3), 0)
696 leases = list(ss.get_leases("si3"))
697 self.failUnlessEqual(len(leases), 2)
699 def test_readonly(self):
700 workdir = self.workdir("test_readonly")
701 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
702 ss.setServiceParent(self.sparent)
704 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
705 self.failUnlessEqual(already, set())
706 self.failUnlessEqual(writers, {})
708 stats = ss.get_stats()
709 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
710 if "storage_server.disk_avail" in stats:
711 # Some platforms may not have an API to get disk stats.
712 # But if there are stats, readonly_storage means disk_avail=0
713 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
715 def test_discard(self):
716 # discard is really only used for other tests, but we test it anyways
717 workdir = self.workdir("test_discard")
718 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
719 ss.setServiceParent(self.sparent)
721 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
722 self.failUnlessEqual(already, set())
723 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
724 for i,wb in writers.items():
725 wb.remote_write(0, "%25d" % i)
727 # since we discard the data, the shares should be present but sparse.
728 # Since we write with some seeks, the data we read back will be all
730 b = ss.remote_get_buckets("vid")
731 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
732 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
734 def test_advise_corruption(self):
735 workdir = self.workdir("test_advise_corruption")
736 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
737 ss.setServiceParent(self.sparent)
739 si0_s = base32.b2a("si0")
740 ss.remote_advise_corrupt_share("immutable", "si0", 0,
741 "This share smells funny.\n")
742 reportdir = os.path.join(workdir, "corruption-advisories")
743 reports = os.listdir(reportdir)
744 self.failUnlessEqual(len(reports), 1)
745 report_si0 = reports[0]
746 self.failUnlessIn(si0_s, report_si0)
747 f = open(os.path.join(reportdir, report_si0), "r")
750 self.failUnlessIn("type: immutable", report)
751 self.failUnlessIn("storage_index: %s" % si0_s, report)
752 self.failUnlessIn("share_number: 0", report)
753 self.failUnlessIn("This share smells funny.", report)
755 # test the RIBucketWriter version too
756 si1_s = base32.b2a("si1")
757 already,writers = self.allocate(ss, "si1", [1], 75)
758 self.failUnlessEqual(already, set())
759 self.failUnlessEqual(set(writers.keys()), set([1]))
760 writers[1].remote_write(0, "data")
761 writers[1].remote_close()
763 b = ss.remote_get_buckets("si1")
764 self.failUnlessEqual(set(b.keys()), set([1]))
765 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
767 reports = os.listdir(reportdir)
768 self.failUnlessEqual(len(reports), 2)
769 report_si1 = [r for r in reports if si1_s in r][0]
770 f = open(os.path.join(reportdir, report_si1), "r")
773 self.failUnlessIn("type: immutable", report)
774 self.failUnlessIn("storage_index: %s" % si1_s, report)
775 self.failUnlessIn("share_number: 1", report)
776 self.failUnlessIn("This share tastes like dust.", report)
780 class MutableServer(unittest.TestCase):
783 self.sparent = LoggingServiceParent()
784 self._lease_secret = itertools.count()
786 return self.sparent.stopService()
788 def workdir(self, name):
789 basedir = os.path.join("storage", "MutableServer", name)
792 def create(self, name):
793 workdir = self.workdir(name)
794 ss = StorageServer(workdir, "\x00" * 20)
795 ss.setServiceParent(self.sparent)
798 def test_create(self):
799 self.create("test_create")
801 def write_enabler(self, we_tag):
802 return hashutil.tagged_hash("we_blah", we_tag)
804 def renew_secret(self, tag):
805 return hashutil.tagged_hash("renew_blah", str(tag))
807 def cancel_secret(self, tag):
808 return hashutil.tagged_hash("cancel_blah", str(tag))
810 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
811 write_enabler = self.write_enabler(we_tag)
812 renew_secret = self.renew_secret(lease_tag)
813 cancel_secret = self.cancel_secret(lease_tag)
814 rstaraw = ss.remote_slot_testv_and_readv_and_writev
815 testandwritev = dict( [ (shnum, ([], [], None) )
816 for shnum in sharenums ] )
818 rc = rstaraw(storage_index,
819 (write_enabler, renew_secret, cancel_secret),
822 (did_write, readv_data) = rc
823 self.failUnless(did_write)
824 self.failUnless(isinstance(readv_data, dict))
825 self.failUnlessEqual(len(readv_data), 0)
827 def test_bad_magic(self):
828 ss = self.create("test_bad_magic")
829 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
830 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
835 read = ss.remote_slot_readv
836 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
837 read, "si1", [0], [(0,10)])
838 self.failUnlessIn(" had magic ", str(e))
839 self.failUnlessIn(" but we wanted ", str(e))
841 def test_container_size(self):
842 ss = self.create("test_container_size")
843 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
845 read = ss.remote_slot_readv
846 rstaraw = ss.remote_slot_testv_and_readv_and_writev
847 secrets = ( self.write_enabler("we1"),
848 self.renew_secret("we1"),
849 self.cancel_secret("we1") )
850 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
851 answer = rstaraw("si1", secrets,
852 {0: ([], [(0,data)], len(data)+12)},
854 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
856 # Trying to make the container too large (by sending a write vector
857 # whose offset is too high) will raise an exception.
858 TOOBIG = MutableShareFile.MAX_SIZE + 10
859 self.failUnlessRaises(DataTooLargeError,
860 rstaraw, "si1", secrets,
861 {0: ([], [(TOOBIG,data)], None)},
864 answer = rstaraw("si1", secrets,
865 {0: ([], [(0,data)], None)},
867 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
869 read_answer = read("si1", [0], [(0,10)])
870 self.failUnlessEqual(read_answer, {0: [data[:10]]})
872 # Sending a new_length shorter than the current length truncates the
874 answer = rstaraw("si1", secrets,
877 read_answer = read("si1", [0], [(0,10)])
878 self.failUnlessEqual(read_answer, {0: [data[:9]]})
880 # Sending a new_length longer than the current length doesn't change
882 answer = rstaraw("si1", secrets,
885 assert answer == (True, {0:[],1:[],2:[]})
886 read_answer = read("si1", [0], [(0, 20)])
887 self.failUnlessEqual(read_answer, {0: [data[:9]]})
889 # Sending a write vector whose start is after the end of the current
890 # data doesn't reveal "whatever was there last time" (palimpsest),
891 # but instead fills with zeroes.
893 # To test this, we fill the data area with a recognizable pattern.
894 pattern = ''.join([chr(i) for i in range(100)])
895 answer = rstaraw("si1", secrets,
896 {0: ([], [(0, pattern)], None)},
898 assert answer == (True, {0:[],1:[],2:[]})
899 # Then truncate the data...
900 answer = rstaraw("si1", secrets,
903 assert answer == (True, {0:[],1:[],2:[]})
904 # Just confirm that you get an empty string if you try to read from
905 # past the (new) endpoint now.
906 answer = rstaraw("si1", secrets,
909 self.failUnlessEqual(answer, (True, {0:[''],1:[''],2:['']}))
911 # Then the extend the file by writing a vector which starts out past
913 answer = rstaraw("si1", secrets,
914 {0: ([], [(50, 'hellothere')], None)},
916 assert answer == (True, {0:[],1:[],2:[]})
917 # Now if you read the stuff between 20 (where we earlier truncated)
918 # and 50, it had better be all zeroes.
919 answer = rstaraw("si1", secrets,
922 self.failUnlessEqual(answer, (True, {0:['\x00'*30],1:[''],2:['']}))
924 # Also see if the server explicitly declares that it supports this
926 ver = ss.remote_get_version()
927 storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
928 self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
930 # If the size is dropped to zero the share is deleted.
931 answer = rstaraw("si1", secrets,
932 {0: ([], [(0,data)], 0)},
934 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
936 read_answer = read("si1", [0], [(0,10)])
937 self.failUnlessEqual(read_answer, {})
939 def test_allocate(self):
940 ss = self.create("test_allocate")
941 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
944 read = ss.remote_slot_readv
945 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
947 self.failUnlessEqual(read("si1", [], [(0, 10)]),
948 {0: [""], 1: [""], 2: [""]})
949 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
953 secrets = ( self.write_enabler("we1"),
954 self.renew_secret("we1"),
955 self.cancel_secret("we1") )
956 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
957 write = ss.remote_slot_testv_and_readv_and_writev
958 answer = write("si1", secrets,
959 {0: ([], [(0,data)], None)},
961 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
963 self.failUnlessEqual(read("si1", [0], [(0,20)]),
964 {0: ["00000000001111111111"]})
965 self.failUnlessEqual(read("si1", [0], [(95,10)]),
967 #self.failUnlessEqual(s0.remote_get_length(), 100)
969 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
970 f = self.failUnlessRaises(BadWriteEnablerError,
971 write, "si1", bad_secrets,
973 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
975 # this testv should fail
976 answer = write("si1", secrets,
977 {0: ([(0, 12, "eq", "444444444444"),
978 (20, 5, "eq", "22222"),
985 self.failUnlessEqual(answer, (False,
986 {0: ["000000000011", "22222"],
990 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
993 answer = write("si1", secrets,
994 {0: ([(10, 5, "lt", "11111"),
1001 self.failUnlessEqual(answer, (False,
1006 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1009 def test_operators(self):
1010 # test operators, the data we're comparing is '11111' in all cases.
1011 # test both fail+pass, reset data after each one.
1012 ss = self.create("test_operators")
1014 secrets = ( self.write_enabler("we1"),
1015 self.renew_secret("we1"),
1016 self.cancel_secret("we1") )
1017 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1018 write = ss.remote_slot_testv_and_readv_and_writev
1019 read = ss.remote_slot_readv
1022 write("si1", secrets,
1023 {0: ([], [(0,data)], None)},
1029 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
1034 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1035 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1036 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
1039 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
1044 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1045 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1048 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1053 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1054 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1058 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1063 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1064 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1067 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1072 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1073 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1076 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1081 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1082 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1086 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1091 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1092 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1095 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1100 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1101 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1105 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1110 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1111 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1114 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1119 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1120 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1124 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1129 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1130 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1133 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1138 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1139 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1142 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1147 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1148 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1152 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1157 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1158 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1161 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1166 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1167 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1170 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1175 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1176 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1179 # finally, test some operators against empty shares
1180 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1185 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1186 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1189 def test_readv(self):
1190 ss = self.create("test_readv")
1191 secrets = ( self.write_enabler("we1"),
1192 self.renew_secret("we1"),
1193 self.cancel_secret("we1") )
1194 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1195 write = ss.remote_slot_testv_and_readv_and_writev
1196 read = ss.remote_slot_readv
1197 data = [("%d" % i) * 100 for i in range(3)]
1198 rc = write("si1", secrets,
1199 {0: ([], [(0,data[0])], None),
1200 1: ([], [(0,data[1])], None),
1201 2: ([], [(0,data[2])], None),
1203 self.failUnlessEqual(rc, (True, {}))
1205 answer = read("si1", [], [(0, 10)])
1206 self.failUnlessEqual(answer, {0: ["0"*10],
1210 def compare_leases_without_timestamps(self, leases_a, leases_b):
1211 self.failUnlessEqual(len(leases_a), len(leases_b))
1212 for i in range(len(leases_a)):
1215 self.failUnlessEqual(a.owner_num, b.owner_num)
1216 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1217 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1218 self.failUnlessEqual(a.nodeid, b.nodeid)
1220 def compare_leases(self, leases_a, leases_b):
1221 self.failUnlessEqual(len(leases_a), len(leases_b))
1222 for i in range(len(leases_a)):
1225 self.failUnlessEqual(a.owner_num, b.owner_num)
1226 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1227 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1228 self.failUnlessEqual(a.nodeid, b.nodeid)
1229 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1231 def test_leases(self):
1232 ss = self.create("test_leases")
1234 return ( self.write_enabler("we1"),
1235 self.renew_secret("we1-%d" % n),
1236 self.cancel_secret("we1-%d" % n) )
1237 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1238 write = ss.remote_slot_testv_and_readv_and_writev
1239 read = ss.remote_slot_readv
1240 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1241 self.failUnlessEqual(rc, (True, {}))
1243 # create a random non-numeric file in the bucket directory, to
1244 # exercise the code that's supposed to ignore those.
1245 bucket_dir = os.path.join(self.workdir("test_leases"),
1246 "shares", storage_index_to_dir("si1"))
1247 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1248 f.write("you ought to be ignoring me\n")
1251 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1252 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1254 # add-lease on a missing storage index is silently ignored
1255 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1257 # re-allocate the slots and use the same secrets, that should update
1259 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1260 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1263 ss.remote_renew_lease("si1", secrets(0)[1])
1264 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1266 # now allocate them with a bunch of different secrets, to trigger the
1267 # extended lease code. Use add_lease for one of them.
1268 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1269 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1270 secrets2 = secrets(2)
1271 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1272 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1273 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1274 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1275 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1277 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1279 all_leases = list(s0.get_leases())
1280 # and write enough data to expand the container, forcing the server
1281 # to move the leases
1282 write("si1", secrets(0),
1283 {0: ([], [(0,data)], 200), },
1286 # read back the leases, make sure they're still intact.
1287 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1289 ss.remote_renew_lease("si1", secrets(0)[1])
1290 ss.remote_renew_lease("si1", secrets(1)[1])
1291 ss.remote_renew_lease("si1", secrets(2)[1])
1292 ss.remote_renew_lease("si1", secrets(3)[1])
1293 ss.remote_renew_lease("si1", secrets(4)[1])
1294 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1295 # get a new copy of the leases, with the current timestamps. Reading
1296 # data and failing to renew/cancel leases should leave the timestamps
1298 all_leases = list(s0.get_leases())
1299 # renewing with a bogus token should prompt an error message
1301 # examine the exception thus raised, make sure the old nodeid is
1302 # present, to provide for share migration
1303 e = self.failUnlessRaises(IndexError,
1304 ss.remote_renew_lease, "si1",
1307 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1308 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1309 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1311 self.compare_leases(all_leases, list(s0.get_leases()))
1313 # reading shares should not modify the timestamp
1314 read("si1", [], [(0,200)])
1315 self.compare_leases(all_leases, list(s0.get_leases()))
1317 write("si1", secrets(0),
1318 {0: ([], [(200, "make me bigger")], None)}, [])
1319 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1321 write("si1", secrets(0),
1322 {0: ([], [(500, "make me really bigger")], None)}, [])
1323 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1325 def test_remove(self):
1326 ss = self.create("test_remove")
1327 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1329 readv = ss.remote_slot_readv
1330 writev = ss.remote_slot_testv_and_readv_and_writev
1331 secrets = ( self.write_enabler("we1"),
1332 self.renew_secret("we1"),
1333 self.cancel_secret("we1") )
1334 # delete sh0 by setting its size to zero
1335 answer = writev("si1", secrets,
1338 # the answer should mention all the shares that existed before the
1340 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1341 # but a new read should show only sh1 and sh2
1342 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1345 # delete sh1 by setting its size to zero
1346 answer = writev("si1", secrets,
1349 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1350 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1353 # delete sh2 by setting its size to zero
1354 answer = writev("si1", secrets,
1357 self.failUnlessEqual(answer, (True, {2:[]}) )
1358 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1360 # and the bucket directory should now be gone
1361 si = base32.b2a("si1")
1362 # note: this is a detail of the storage server implementation, and
1363 # may change in the future
1365 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1366 bucketdir = os.path.join(prefixdir, si)
1367 self.failUnless(os.path.exists(prefixdir), prefixdir)
1368 self.failIf(os.path.exists(bucketdir), bucketdir)
1371 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1373 self.sparent = LoggingServiceParent()
1374 self._lease_secret = itertools.count()
1375 self.ss = self.create("MDMFProxies storage test server")
1376 self.rref = RemoteBucket()
1377 self.rref.target = self.ss
1378 self.secrets = (self.write_enabler("we_secret"),
1379 self.renew_secret("renew_secret"),
1380 self.cancel_secret("cancel_secret"))
1381 self.segment = "aaaaaa"
1383 self.salt = "a" * 16
1384 self.block_hash = "a" * 32
1385 self.block_hash_tree = [self.block_hash for i in xrange(6)]
1386 self.share_hash = self.block_hash
1387 self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1388 self.signature = "foobarbaz"
1389 self.verification_key = "vvvvvv"
1390 self.encprivkey = "private"
1391 self.root_hash = self.block_hash
1392 self.salt_hash = self.root_hash
1393 self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1394 self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1395 self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1396 # blockhashes and salt hashes are serialized in the same way,
1397 # only we lop off the first element and store that in the
1399 self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1403 self.sparent.stopService()
1404 shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1407 def write_enabler(self, we_tag):
1408 return hashutil.tagged_hash("we_blah", we_tag)
1411 def renew_secret(self, tag):
1412 return hashutil.tagged_hash("renew_blah", str(tag))
1415 def cancel_secret(self, tag):
1416 return hashutil.tagged_hash("cancel_blah", str(tag))
1419 def workdir(self, name):
1420 basedir = os.path.join("storage", "MutableServer", name)
1424 def create(self, name):
1425 workdir = self.workdir(name)
1426 ss = StorageServer(workdir, "\x00" * 20)
1427 ss.setServiceParent(self.sparent)
1431 def build_test_mdmf_share(self, tail_segment=False, empty=False):
1432 # Start with the checkstring
1433 data = struct.pack(">BQ32s",
1437 self.checkstring = data
1438 # Next, the encoding parameters
1440 data += struct.pack(">BBQQ",
1446 data += struct.pack(">BBQQ",
1452 data += struct.pack(">BBQQ",
1457 # Now we'll build the offsets.
1459 if not tail_segment and not empty:
1461 sharedata += self.salt + self.block
1464 sharedata += self.salt + self.block
1465 sharedata += self.salt + "a"
1467 # The encrypted private key comes after the shares + salts
1468 offset_size = struct.calcsize(MDMFOFFSETS)
1469 encrypted_private_key_offset = len(data) + offset_size
1470 # The share has chain comes after the private key
1471 sharehashes_offset = encrypted_private_key_offset + \
1472 len(self.encprivkey)
1474 # The signature comes after the share hash chain.
1475 signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1477 verification_key_offset = signature_offset + len(self.signature)
1478 verification_key_end = verification_key_offset + \
1479 len(self.verification_key)
1481 share_data_offset = offset_size
1482 share_data_offset += PRIVATE_KEY_SIZE
1483 share_data_offset += SIGNATURE_SIZE
1484 share_data_offset += VERIFICATION_KEY_SIZE
1485 share_data_offset += SHARE_HASH_CHAIN_SIZE
1487 blockhashes_offset = share_data_offset + len(sharedata)
1488 eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1490 data += struct.pack(MDMFOFFSETS,
1491 encrypted_private_key_offset,
1494 verification_key_offset,
1495 verification_key_end,
1501 self.offsets['enc_privkey'] = encrypted_private_key_offset
1502 self.offsets['block_hash_tree'] = blockhashes_offset
1503 self.offsets['share_hash_chain'] = sharehashes_offset
1504 self.offsets['signature'] = signature_offset
1505 self.offsets['verification_key'] = verification_key_offset
1506 self.offsets['share_data'] = share_data_offset
1507 self.offsets['verification_key_end'] = verification_key_end
1508 self.offsets['EOF'] = eof_offset
1511 data += self.encprivkey
1513 data += self.share_hash_chain_s
1515 data += self.signature
1516 # and the verification key
1517 data += self.verification_key
1518 # Then we'll add in gibberish until we get to the right point.
1519 nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1522 # Then the share data
1525 data += self.block_hash_tree_s
1529 def write_test_share_to_server(self,
1534 I write some data for the read tests to read to self.ss
1536 If tail_segment=True, then I will write a share that has a
1537 smaller tail segment than other segments.
1539 write = self.ss.remote_slot_testv_and_readv_and_writev
1540 data = self.build_test_mdmf_share(tail_segment, empty)
1541 # Finally, we write the whole thing to the storage server in one
1543 testvs = [(0, 1, "eq", "")]
1545 tws[0] = (testvs, [(0, data)], None)
1547 results = write(storage_index, self.secrets, tws, readv)
1548 self.failUnless(results[0])
1551 def build_test_sdmf_share(self, empty=False):
1555 sharedata = self.segment * 6
1556 self.sharedata = sharedata
1557 blocksize = len(sharedata) / 3
1558 block = sharedata[:blocksize]
1559 self.blockdata = block
1560 prefix = struct.pack(">BQ32s16s BBQQ",
1570 post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1571 signature_offset = post_offset + len(self.verification_key)
1572 sharehashes_offset = signature_offset + len(self.signature)
1573 blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1574 sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1575 encprivkey_offset = sharedata_offset + len(block)
1576 eof_offset = encprivkey_offset + len(self.encprivkey)
1577 offsets = struct.pack(">LLLLQQ",
1584 final_share = "".join([prefix,
1586 self.verification_key,
1588 self.share_hash_chain_s,
1589 self.block_hash_tree_s,
1593 self.offsets['signature'] = signature_offset
1594 self.offsets['share_hash_chain'] = sharehashes_offset
1595 self.offsets['block_hash_tree'] = blockhashes_offset
1596 self.offsets['share_data'] = sharedata_offset
1597 self.offsets['enc_privkey'] = encprivkey_offset
1598 self.offsets['EOF'] = eof_offset
1602 def write_sdmf_share_to_server(self,
1605 # Some tests need SDMF shares to verify that we can still
1606 # read them. This method writes one, which resembles but is not
1608 write = self.ss.remote_slot_testv_and_readv_and_writev
1609 share = self.build_test_sdmf_share(empty)
1610 testvs = [(0, 1, "eq", "")]
1612 tws[0] = (testvs, [(0, share)], None)
1614 results = write(storage_index, self.secrets, tws, readv)
1615 self.failUnless(results[0])
1618 def test_read(self):
1619 self.write_test_share_to_server("si1")
1620 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1621 # Check that every method equals what we expect it to.
1622 d = defer.succeed(None)
1623 def _check_block_and_salt((block, salt)):
1624 self.failUnlessEqual(block, self.block)
1625 self.failUnlessEqual(salt, self.salt)
1628 d.addCallback(lambda ignored, i=i:
1629 mr.get_block_and_salt(i))
1630 d.addCallback(_check_block_and_salt)
1632 d.addCallback(lambda ignored:
1633 mr.get_encprivkey())
1634 d.addCallback(lambda encprivkey:
1635 self.failUnlessEqual(self.encprivkey, encprivkey))
1637 d.addCallback(lambda ignored:
1638 mr.get_blockhashes())
1639 d.addCallback(lambda blockhashes:
1640 self.failUnlessEqual(self.block_hash_tree, blockhashes))
1642 d.addCallback(lambda ignored:
1643 mr.get_sharehashes())
1644 d.addCallback(lambda sharehashes:
1645 self.failUnlessEqual(self.share_hash_chain, sharehashes))
1647 d.addCallback(lambda ignored:
1649 d.addCallback(lambda signature:
1650 self.failUnlessEqual(signature, self.signature))
1652 d.addCallback(lambda ignored:
1653 mr.get_verification_key())
1654 d.addCallback(lambda verification_key:
1655 self.failUnlessEqual(verification_key, self.verification_key))
1657 d.addCallback(lambda ignored:
1659 d.addCallback(lambda seqnum:
1660 self.failUnlessEqual(seqnum, 0))
1662 d.addCallback(lambda ignored:
1664 d.addCallback(lambda root_hash:
1665 self.failUnlessEqual(self.root_hash, root_hash))
1667 d.addCallback(lambda ignored:
1669 d.addCallback(lambda seqnum:
1670 self.failUnlessEqual(0, seqnum))
1672 d.addCallback(lambda ignored:
1673 mr.get_encoding_parameters())
1674 def _check_encoding_parameters((k, n, segsize, datalen)):
1675 self.failUnlessEqual(k, 3)
1676 self.failUnlessEqual(n, 10)
1677 self.failUnlessEqual(segsize, 6)
1678 self.failUnlessEqual(datalen, 36)
1679 d.addCallback(_check_encoding_parameters)
1681 d.addCallback(lambda ignored:
1682 mr.get_checkstring())
1683 d.addCallback(lambda checkstring:
1684 self.failUnlessEqual(checkstring, checkstring))
1688 def test_read_with_different_tail_segment_size(self):
1689 self.write_test_share_to_server("si1", tail_segment=True)
1690 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1691 d = mr.get_block_and_salt(5)
1692 def _check_tail_segment(results):
1693 block, salt = results
1694 self.failUnlessEqual(len(block), 1)
1695 self.failUnlessEqual(block, "a")
1696 d.addCallback(_check_tail_segment)
1700 def test_get_block_with_invalid_segnum(self):
1701 self.write_test_share_to_server("si1")
1702 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1703 d = defer.succeed(None)
1704 d.addCallback(lambda ignored:
1705 self.shouldFail(LayoutInvalid, "test invalid segnum",
1707 mr.get_block_and_salt, 7))
1711 def test_get_encoding_parameters_first(self):
1712 self.write_test_share_to_server("si1")
1713 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1714 d = mr.get_encoding_parameters()
1715 def _check_encoding_parameters((k, n, segment_size, datalen)):
1716 self.failUnlessEqual(k, 3)
1717 self.failUnlessEqual(n, 10)
1718 self.failUnlessEqual(segment_size, 6)
1719 self.failUnlessEqual(datalen, 36)
1720 d.addCallback(_check_encoding_parameters)
1724 def test_get_seqnum_first(self):
1725 self.write_test_share_to_server("si1")
1726 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1728 d.addCallback(lambda seqnum:
1729 self.failUnlessEqual(seqnum, 0))
1733 def test_get_root_hash_first(self):
1734 self.write_test_share_to_server("si1")
1735 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1736 d = mr.get_root_hash()
1737 d.addCallback(lambda root_hash:
1738 self.failUnlessEqual(root_hash, self.root_hash))
1742 def test_get_checkstring_first(self):
1743 self.write_test_share_to_server("si1")
1744 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1745 d = mr.get_checkstring()
1746 d.addCallback(lambda checkstring:
1747 self.failUnlessEqual(checkstring, self.checkstring))
1751 def test_write_read_vectors(self):
1752 # When writing for us, the storage server will return to us a
1753 # read vector, along with its result. If a write fails because
1754 # the test vectors failed, this read vector can help us to
1755 # diagnose the problem. This test ensures that the read vector
1756 # is working appropriately.
1757 mw = self._make_new_mw("si1", 0)
1760 mw.put_block(self.block, i, self.salt)
1761 mw.put_encprivkey(self.encprivkey)
1762 mw.put_blockhashes(self.block_hash_tree)
1763 mw.put_sharehashes(self.share_hash_chain)
1764 mw.put_root_hash(self.root_hash)
1765 mw.put_signature(self.signature)
1766 mw.put_verification_key(self.verification_key)
1767 d = mw.finish_publishing()
1769 self.failUnless(len(results), 2)
1770 result, readv = results
1771 self.failUnless(result)
1773 self.old_checkstring = mw.get_checkstring()
1774 mw.set_checkstring("")
1775 d.addCallback(_then)
1776 d.addCallback(lambda ignored:
1777 mw.finish_publishing())
1778 def _then_again(results):
1779 self.failUnlessEqual(len(results), 2)
1780 result, readvs = results
1782 self.failUnlessIn(0, readvs)
1783 readv = readvs[0][0]
1784 self.failUnlessEqual(readv, self.old_checkstring)
1785 d.addCallback(_then_again)
1786 # The checkstring remains the same for the rest of the process.
1790 def test_private_key_after_share_hash_chain(self):
1791 mw = self._make_new_mw("si1", 0)
1792 d = defer.succeed(None)
1794 d.addCallback(lambda ignored, i=i:
1795 mw.put_block(self.block, i, self.salt))
1796 d.addCallback(lambda ignored:
1797 mw.put_encprivkey(self.encprivkey))
1798 d.addCallback(lambda ignored:
1799 mw.put_sharehashes(self.share_hash_chain))
1801 # Now try to put the private key again.
1802 d.addCallback(lambda ignored:
1803 self.shouldFail(LayoutInvalid, "test repeat private key",
1805 mw.put_encprivkey, self.encprivkey))
1809 def test_signature_after_verification_key(self):
1810 mw = self._make_new_mw("si1", 0)
1811 d = defer.succeed(None)
1812 # Put everything up to and including the verification key.
1814 d.addCallback(lambda ignored, i=i:
1815 mw.put_block(self.block, i, self.salt))
1816 d.addCallback(lambda ignored:
1817 mw.put_encprivkey(self.encprivkey))
1818 d.addCallback(lambda ignored:
1819 mw.put_blockhashes(self.block_hash_tree))
1820 d.addCallback(lambda ignored:
1821 mw.put_sharehashes(self.share_hash_chain))
1822 d.addCallback(lambda ignored:
1823 mw.put_root_hash(self.root_hash))
1824 d.addCallback(lambda ignored:
1825 mw.put_signature(self.signature))
1826 d.addCallback(lambda ignored:
1827 mw.put_verification_key(self.verification_key))
1828 # Now try to put the signature again. This should fail
1829 d.addCallback(lambda ignored:
1830 self.shouldFail(LayoutInvalid, "signature after verification",
1832 mw.put_signature, self.signature))
1836 def test_uncoordinated_write(self):
1837 # Make two mutable writers, both pointing to the same storage
1838 # server, both at the same storage index, and try writing to the
1840 mw1 = self._make_new_mw("si1", 0)
1841 mw2 = self._make_new_mw("si1", 0)
1843 def _check_success(results):
1844 result, readvs = results
1845 self.failUnless(result)
1847 def _check_failure(results):
1848 result, readvs = results
1851 def _write_share(mw):
1853 mw.put_block(self.block, i, self.salt)
1854 mw.put_encprivkey(self.encprivkey)
1855 mw.put_blockhashes(self.block_hash_tree)
1856 mw.put_sharehashes(self.share_hash_chain)
1857 mw.put_root_hash(self.root_hash)
1858 mw.put_signature(self.signature)
1859 mw.put_verification_key(self.verification_key)
1860 return mw.finish_publishing()
1861 d = _write_share(mw1)
1862 d.addCallback(_check_success)
1863 d.addCallback(lambda ignored:
1865 d.addCallback(_check_failure)
1869 def test_invalid_salt_size(self):
1870 # Salts need to be 16 bytes in size. Writes that attempt to
1871 # write more or less than this should be rejected.
1872 mw = self._make_new_mw("si1", 0)
1873 invalid_salt = "a" * 17 # 17 bytes
1874 another_invalid_salt = "b" * 15 # 15 bytes
1875 d = defer.succeed(None)
1876 d.addCallback(lambda ignored:
1877 self.shouldFail(LayoutInvalid, "salt too big",
1879 mw.put_block, self.block, 0, invalid_salt))
1880 d.addCallback(lambda ignored:
1881 self.shouldFail(LayoutInvalid, "salt too small",
1883 mw.put_block, self.block, 0,
1884 another_invalid_salt))
1888 def test_write_test_vectors(self):
1889 # If we give the write proxy a bogus test vector at
1890 # any point during the process, it should fail to write when we
1892 def _check_failure(results):
1893 self.failUnlessEqual(len(results), 2)
1897 def _check_success(results):
1898 self.failUnlessEqual(len(results), 2)
1900 self.failUnless(results)
1902 mw = self._make_new_mw("si1", 0)
1903 mw.set_checkstring("this is a lie")
1905 mw.put_block(self.block, i, self.salt)
1906 mw.put_encprivkey(self.encprivkey)
1907 mw.put_blockhashes(self.block_hash_tree)
1908 mw.put_sharehashes(self.share_hash_chain)
1909 mw.put_root_hash(self.root_hash)
1910 mw.put_signature(self.signature)
1911 mw.put_verification_key(self.verification_key)
1912 d = mw.finish_publishing()
1913 d.addCallback(_check_failure)
1914 d.addCallback(lambda ignored:
1915 mw.set_checkstring(""))
1916 d.addCallback(lambda ignored:
1917 mw.finish_publishing())
1918 d.addCallback(_check_success)
1922 def serialize_blockhashes(self, blockhashes):
1923 return "".join(blockhashes)
1926 def serialize_sharehashes(self, sharehashes):
1927 ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1928 for i in sorted(sharehashes.keys())])
1932 def test_write(self):
1933 # This translates to a file with 6 6-byte segments, and with 2-byte
1935 mw = self._make_new_mw("si1", 0)
1936 # Test writing some blocks.
1937 read = self.ss.remote_slot_readv
1938 expected_private_key_offset = struct.calcsize(MDMFHEADER)
1939 expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1940 PRIVATE_KEY_SIZE + \
1942 VERIFICATION_KEY_SIZE + \
1943 SHARE_HASH_CHAIN_SIZE
1944 written_block_size = 2 + len(self.salt)
1945 written_block = self.block + self.salt
1947 mw.put_block(self.block, i, self.salt)
1949 mw.put_encprivkey(self.encprivkey)
1950 mw.put_blockhashes(self.block_hash_tree)
1951 mw.put_sharehashes(self.share_hash_chain)
1952 mw.put_root_hash(self.root_hash)
1953 mw.put_signature(self.signature)
1954 mw.put_verification_key(self.verification_key)
1955 d = mw.finish_publishing()
1956 def _check_publish(results):
1957 self.failUnlessEqual(len(results), 2)
1958 result, ign = results
1959 self.failUnless(result, "publish failed")
1961 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1962 {0: [written_block]})
1964 self.failUnlessEqual(len(self.encprivkey), 7)
1965 self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1966 {0: [self.encprivkey]})
1968 expected_block_hash_offset = expected_sharedata_offset + \
1969 (6 * written_block_size)
1970 self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1971 self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1972 {0: [self.block_hash_tree_s]})
1974 expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1975 self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1976 {0: [self.share_hash_chain_s]})
1978 self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1979 {0: [self.root_hash]})
1980 expected_signature_offset = expected_share_hash_offset + \
1981 len(self.share_hash_chain_s)
1982 self.failUnlessEqual(len(self.signature), 9)
1983 self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1984 {0: [self.signature]})
1986 expected_verification_key_offset = expected_signature_offset + len(self.signature)
1987 self.failUnlessEqual(len(self.verification_key), 6)
1988 self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1989 {0: [self.verification_key]})
1991 signable = mw.get_signable()
1992 verno, seq, roothash, k, n, segsize, datalen = \
1993 struct.unpack(">BQ32sBBQQ",
1995 self.failUnlessEqual(verno, 1)
1996 self.failUnlessEqual(seq, 0)
1997 self.failUnlessEqual(roothash, self.root_hash)
1998 self.failUnlessEqual(k, 3)
1999 self.failUnlessEqual(n, 10)
2000 self.failUnlessEqual(segsize, 6)
2001 self.failUnlessEqual(datalen, 36)
2002 expected_eof_offset = expected_block_hash_offset + \
2003 len(self.block_hash_tree_s)
2005 # Check the version number to make sure that it is correct.
2006 expected_version_number = struct.pack(">B", 1)
2007 self.failUnlessEqual(read("si1", [0], [(0, 1)]),
2008 {0: [expected_version_number]})
2009 # Check the sequence number to make sure that it is correct
2010 expected_sequence_number = struct.pack(">Q", 0)
2011 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2012 {0: [expected_sequence_number]})
2013 # Check that the encoding parameters (k, N, segement size, data
2014 # length) are what they should be. These are 3, 10, 6, 36
2015 expected_k = struct.pack(">B", 3)
2016 self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2018 expected_n = struct.pack(">B", 10)
2019 self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2021 expected_segment_size = struct.pack(">Q", 6)
2022 self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2023 {0: [expected_segment_size]})
2024 expected_data_length = struct.pack(">Q", 36)
2025 self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2026 {0: [expected_data_length]})
2027 expected_offset = struct.pack(">Q", expected_private_key_offset)
2028 self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2029 {0: [expected_offset]})
2030 expected_offset = struct.pack(">Q", expected_share_hash_offset)
2031 self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2032 {0: [expected_offset]})
2033 expected_offset = struct.pack(">Q", expected_signature_offset)
2034 self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2035 {0: [expected_offset]})
2036 expected_offset = struct.pack(">Q", expected_verification_key_offset)
2037 self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2038 {0: [expected_offset]})
2039 expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2040 self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2041 {0: [expected_offset]})
2042 expected_offset = struct.pack(">Q", expected_sharedata_offset)
2043 self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2044 {0: [expected_offset]})
2045 expected_offset = struct.pack(">Q", expected_block_hash_offset)
2046 self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2047 {0: [expected_offset]})
2048 expected_offset = struct.pack(">Q", expected_eof_offset)
2049 self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2050 {0: [expected_offset]})
2051 d.addCallback(_check_publish)
2054 def _make_new_mw(self, si, share, datalength=36):
2055 # This is a file of size 36 bytes. Since it has a segment
2056 # size of 6, we know that it has 6 byte segments, which will
2057 # be split into blocks of 2 bytes because our FEC k
2059 mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2064 def test_write_rejected_with_too_many_blocks(self):
2065 mw = self._make_new_mw("si0", 0)
2067 # Try writing too many blocks. We should not be able to write
2069 # blocks into each share.
2070 d = defer.succeed(None)
2072 d.addCallback(lambda ignored, i=i:
2073 mw.put_block(self.block, i, self.salt))
2074 d.addCallback(lambda ignored:
2075 self.shouldFail(LayoutInvalid, "too many blocks",
2077 mw.put_block, self.block, 7, self.salt))
2081 def test_write_rejected_with_invalid_salt(self):
2082 # Try writing an invalid salt. Salts are 16 bytes -- any more or
2083 # less should cause an error.
2084 mw = self._make_new_mw("si1", 0)
2085 bad_salt = "a" * 17 # 17 bytes
2086 d = defer.succeed(None)
2087 d.addCallback(lambda ignored:
2088 self.shouldFail(LayoutInvalid, "test_invalid_salt",
2089 None, mw.put_block, self.block, 7, bad_salt))
2093 def test_write_rejected_with_invalid_root_hash(self):
2094 # Try writing an invalid root hash. This should be SHA256d, and
2095 # 32 bytes long as a result.
2096 mw = self._make_new_mw("si2", 0)
2097 # 17 bytes != 32 bytes
2098 invalid_root_hash = "a" * 17
2099 d = defer.succeed(None)
2100 # Before this test can work, we need to put some blocks + salts,
2101 # a block hash tree, and a share hash tree. Otherwise, we'll see
2102 # failures that match what we are looking for, but are caused by
2103 # the constraints imposed on operation ordering.
2105 d.addCallback(lambda ignored, i=i:
2106 mw.put_block(self.block, i, self.salt))
2107 d.addCallback(lambda ignored:
2108 mw.put_encprivkey(self.encprivkey))
2109 d.addCallback(lambda ignored:
2110 mw.put_blockhashes(self.block_hash_tree))
2111 d.addCallback(lambda ignored:
2112 mw.put_sharehashes(self.share_hash_chain))
2113 d.addCallback(lambda ignored:
2114 self.shouldFail(LayoutInvalid, "invalid root hash",
2115 None, mw.put_root_hash, invalid_root_hash))
2119 def test_write_rejected_with_invalid_blocksize(self):
2120 # The blocksize implied by the writer that we get from
2121 # _make_new_mw is 2bytes -- any more or any less than this
2122 # should be cause for failure, unless it is the tail segment, in
2123 # which case it may not be failure.
2125 mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2127 # 1 bytes != 2 bytes
2128 d = defer.succeed(None)
2129 d.addCallback(lambda ignored, invalid_block=invalid_block:
2130 self.shouldFail(LayoutInvalid, "test blocksize too small",
2131 None, mw.put_block, invalid_block, 0,
2133 invalid_block = invalid_block * 3
2134 # 3 bytes != 2 bytes
2135 d.addCallback(lambda ignored:
2136 self.shouldFail(LayoutInvalid, "test blocksize too large",
2138 mw.put_block, invalid_block, 0, self.salt))
2140 d.addCallback(lambda ignored, i=i:
2141 mw.put_block(self.block, i, self.salt))
2142 # Try to put an invalid tail segment
2143 d.addCallback(lambda ignored:
2144 self.shouldFail(LayoutInvalid, "test invalid tail segment",
2146 mw.put_block, self.block, 5, self.salt))
2148 d.addCallback(lambda ignored:
2149 mw.put_block(valid_block, 5, self.salt))
2153 def test_write_enforces_order_constraints(self):
2154 # We require that the MDMFSlotWriteProxy be interacted with in a
2158 # 1: write blocks and salts
2159 # 2: Write the encrypted private key
2160 # 3: Write the block hashes
2161 # 4: Write the share hashes
2162 # 5: Write the root hash and salt hash
2163 # 6: Write the signature and verification key
2164 # 7: Write the file.
2166 # Some of these can be performed out-of-order, and some can't.
2167 # The dependencies that I want to test here are:
2168 # - Private key before block hashes
2169 # - share hashes and block hashes before root hash
2170 # - root hash before signature
2171 # - signature before verification key
2172 mw0 = self._make_new_mw("si0", 0)
2174 d = defer.succeed(None)
2176 d.addCallback(lambda ignored, i=i:
2177 mw0.put_block(self.block, i, self.salt))
2179 # Try to write the share hash chain without writing the
2180 # encrypted private key
2181 d.addCallback(lambda ignored:
2182 self.shouldFail(LayoutInvalid, "share hash chain before "
2185 mw0.put_sharehashes, self.share_hash_chain))
2186 # Write the private key.
2187 d.addCallback(lambda ignored:
2188 mw0.put_encprivkey(self.encprivkey))
2190 # Now write the block hashes and try again
2191 d.addCallback(lambda ignored:
2192 mw0.put_blockhashes(self.block_hash_tree))
2194 # We haven't yet put the root hash on the share, so we shouldn't
2195 # be able to sign it.
2196 d.addCallback(lambda ignored:
2197 self.shouldFail(LayoutInvalid, "signature before root hash",
2198 None, mw0.put_signature, self.signature))
2200 d.addCallback(lambda ignored:
2201 self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2203 # ..and, since that fails, we also shouldn't be able to put the
2205 d.addCallback(lambda ignored:
2206 self.shouldFail(LayoutInvalid, "key before signature",
2207 None, mw0.put_verification_key,
2208 self.verification_key))
2210 # Now write the share hashes.
2211 d.addCallback(lambda ignored:
2212 mw0.put_sharehashes(self.share_hash_chain))
2213 # We should be able to write the root hash now too
2214 d.addCallback(lambda ignored:
2215 mw0.put_root_hash(self.root_hash))
2217 # We should still be unable to put the verification key
2218 d.addCallback(lambda ignored:
2219 self.shouldFail(LayoutInvalid, "key before signature",
2220 None, mw0.put_verification_key,
2221 self.verification_key))
2223 d.addCallback(lambda ignored:
2224 mw0.put_signature(self.signature))
2226 # We shouldn't be able to write the offsets to the remote server
2227 # until the offset table is finished; IOW, until we have written
2228 # the verification key.
2229 d.addCallback(lambda ignored:
2230 self.shouldFail(LayoutInvalid, "offsets before verification key",
2232 mw0.finish_publishing))
2234 d.addCallback(lambda ignored:
2235 mw0.put_verification_key(self.verification_key))
2239 def test_end_to_end(self):
2240 mw = self._make_new_mw("si1", 0)
2241 # Write a share using the mutable writer, and make sure that the
2242 # reader knows how to read everything back to us.
2243 d = defer.succeed(None)
2245 d.addCallback(lambda ignored, i=i:
2246 mw.put_block(self.block, i, self.salt))
2247 d.addCallback(lambda ignored:
2248 mw.put_encprivkey(self.encprivkey))
2249 d.addCallback(lambda ignored:
2250 mw.put_blockhashes(self.block_hash_tree))
2251 d.addCallback(lambda ignored:
2252 mw.put_sharehashes(self.share_hash_chain))
2253 d.addCallback(lambda ignored:
2254 mw.put_root_hash(self.root_hash))
2255 d.addCallback(lambda ignored:
2256 mw.put_signature(self.signature))
2257 d.addCallback(lambda ignored:
2258 mw.put_verification_key(self.verification_key))
2259 d.addCallback(lambda ignored:
2260 mw.finish_publishing())
2262 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2263 def _check_block_and_salt((block, salt)):
2264 self.failUnlessEqual(block, self.block)
2265 self.failUnlessEqual(salt, self.salt)
2268 d.addCallback(lambda ignored, i=i:
2269 mr.get_block_and_salt(i))
2270 d.addCallback(_check_block_and_salt)
2272 d.addCallback(lambda ignored:
2273 mr.get_encprivkey())
2274 d.addCallback(lambda encprivkey:
2275 self.failUnlessEqual(self.encprivkey, encprivkey))
2277 d.addCallback(lambda ignored:
2278 mr.get_blockhashes())
2279 d.addCallback(lambda blockhashes:
2280 self.failUnlessEqual(self.block_hash_tree, blockhashes))
2282 d.addCallback(lambda ignored:
2283 mr.get_sharehashes())
2284 d.addCallback(lambda sharehashes:
2285 self.failUnlessEqual(self.share_hash_chain, sharehashes))
2287 d.addCallback(lambda ignored:
2289 d.addCallback(lambda signature:
2290 self.failUnlessEqual(signature, self.signature))
2292 d.addCallback(lambda ignored:
2293 mr.get_verification_key())
2294 d.addCallback(lambda verification_key:
2295 self.failUnlessEqual(verification_key, self.verification_key))
2297 d.addCallback(lambda ignored:
2299 d.addCallback(lambda seqnum:
2300 self.failUnlessEqual(seqnum, 0))
2302 d.addCallback(lambda ignored:
2304 d.addCallback(lambda root_hash:
2305 self.failUnlessEqual(self.root_hash, root_hash))
2307 d.addCallback(lambda ignored:
2308 mr.get_encoding_parameters())
2309 def _check_encoding_parameters((k, n, segsize, datalen)):
2310 self.failUnlessEqual(k, 3)
2311 self.failUnlessEqual(n, 10)
2312 self.failUnlessEqual(segsize, 6)
2313 self.failUnlessEqual(datalen, 36)
2314 d.addCallback(_check_encoding_parameters)
2316 d.addCallback(lambda ignored:
2317 mr.get_checkstring())
2318 d.addCallback(lambda checkstring:
2319 self.failUnlessEqual(checkstring, mw.get_checkstring()))
2323 def test_is_sdmf(self):
2324 # The MDMFSlotReadProxy should also know how to read SDMF files,
2325 # since it will encounter them on the grid. Callers use the
2326 # is_sdmf method to test this.
2327 self.write_sdmf_share_to_server("si1")
2328 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2330 d.addCallback(lambda issdmf:
2331 self.failUnless(issdmf))
2335 def test_reads_sdmf(self):
2336 # The slot read proxy should, naturally, know how to tell us
2337 # about data in the SDMF format
2338 self.write_sdmf_share_to_server("si1")
2339 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2340 d = defer.succeed(None)
2341 d.addCallback(lambda ignored:
2343 d.addCallback(lambda issdmf:
2344 self.failUnless(issdmf))
2346 # What do we need to read?
2349 d.addCallback(lambda ignored:
2350 mr.get_block_and_salt(0))
2351 def _check_block_and_salt(results):
2352 block, salt = results
2353 # Our original file is 36 bytes long. Then each share is 12
2354 # bytes in size. The share is composed entirely of the
2355 # letter a. self.block contains 2 as, so 6 * self.block is
2356 # what we are looking for.
2357 self.failUnlessEqual(block, self.block * 6)
2358 self.failUnlessEqual(salt, self.salt)
2359 d.addCallback(_check_block_and_salt)
2362 d.addCallback(lambda ignored:
2363 mr.get_blockhashes())
2364 d.addCallback(lambda blockhashes:
2365 self.failUnlessEqual(self.block_hash_tree,
2369 d.addCallback(lambda ignored:
2370 mr.get_sharehashes())
2371 d.addCallback(lambda sharehashes:
2372 self.failUnlessEqual(self.share_hash_chain,
2375 d.addCallback(lambda ignored:
2376 mr.get_encprivkey())
2377 d.addCallback(lambda encprivkey:
2378 self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2379 d.addCallback(lambda ignored:
2380 mr.get_verification_key())
2381 d.addCallback(lambda verification_key:
2382 self.failUnlessEqual(verification_key,
2383 self.verification_key,
2386 d.addCallback(lambda ignored:
2388 d.addCallback(lambda signature:
2389 self.failUnlessEqual(signature, self.signature, signature))
2391 # - The sequence number
2392 d.addCallback(lambda ignored:
2394 d.addCallback(lambda seqnum:
2395 self.failUnlessEqual(seqnum, 0, seqnum))
2398 d.addCallback(lambda ignored:
2400 d.addCallback(lambda root_hash:
2401 self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2405 def test_only_reads_one_segment_sdmf(self):
2406 # SDMF shares have only one segment, so it doesn't make sense to
2407 # read more segments than that. The reader should know this and
2408 # complain if we try to do that.
2409 self.write_sdmf_share_to_server("si1")
2410 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2411 d = defer.succeed(None)
2412 d.addCallback(lambda ignored:
2414 d.addCallback(lambda issdmf:
2415 self.failUnless(issdmf))
2416 d.addCallback(lambda ignored:
2417 self.shouldFail(LayoutInvalid, "test bad segment",
2419 mr.get_block_and_salt, 1))
2423 def test_read_with_prefetched_mdmf_data(self):
2424 # The MDMFSlotReadProxy will prefill certain fields if you pass
2425 # it data that you have already fetched. This is useful for
2426 # cases like the Servermap, which prefetches ~2kb of data while
2427 # finding out which shares are on the remote peer so that it
2428 # doesn't waste round trips.
2429 mdmf_data = self.build_test_mdmf_share()
2430 self.write_test_share_to_server("si1")
2431 def _make_mr(ignored, length):
2432 mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2435 d = defer.succeed(None)
2436 # This should be enough to fill in both the encoding parameters
2437 # and the table of offsets, which will complete the version
2438 # information tuple.
2439 d.addCallback(_make_mr, 123)
2440 d.addCallback(lambda mr:
2442 def _check_verinfo(verinfo):
2443 self.failUnless(verinfo)
2444 self.failUnlessEqual(len(verinfo), 9)
2454 self.failUnlessEqual(seqnum, 0)
2455 self.failUnlessEqual(root_hash, self.root_hash)
2456 self.failUnlessEqual(segsize, 6)
2457 self.failUnlessEqual(datalen, 36)
2458 self.failUnlessEqual(k, 3)
2459 self.failUnlessEqual(n, 10)
2460 expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2468 self.failUnlessEqual(expected_prefix, prefix)
2469 self.failUnlessEqual(self.rref.read_count, 0)
2470 d.addCallback(_check_verinfo)
2471 # This is not enough data to read a block and a share, so the
2472 # wrapper should attempt to read this from the remote server.
2473 d.addCallback(_make_mr, 123)
2474 d.addCallback(lambda mr:
2475 mr.get_block_and_salt(0))
2476 def _check_block_and_salt((block, salt)):
2477 self.failUnlessEqual(block, self.block)
2478 self.failUnlessEqual(salt, self.salt)
2479 self.failUnlessEqual(self.rref.read_count, 1)
2480 # This should be enough data to read one block.
2481 d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2482 d.addCallback(lambda mr:
2483 mr.get_block_and_salt(0))
2484 d.addCallback(_check_block_and_salt)
2488 def test_read_with_prefetched_sdmf_data(self):
2489 sdmf_data = self.build_test_sdmf_share()
2490 self.write_sdmf_share_to_server("si1")
2491 def _make_mr(ignored, length):
2492 mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2495 d = defer.succeed(None)
2496 # This should be enough to get us the encoding parameters,
2497 # offset table, and everything else we need to build a verinfo
2499 d.addCallback(_make_mr, 123)
2500 d.addCallback(lambda mr:
2502 def _check_verinfo(verinfo):
2503 self.failUnless(verinfo)
2504 self.failUnlessEqual(len(verinfo), 9)
2514 self.failUnlessEqual(seqnum, 0)
2515 self.failUnlessEqual(root_hash, self.root_hash)
2516 self.failUnlessEqual(salt, self.salt)
2517 self.failUnlessEqual(segsize, 36)
2518 self.failUnlessEqual(datalen, 36)
2519 self.failUnlessEqual(k, 3)
2520 self.failUnlessEqual(n, 10)
2521 expected_prefix = struct.pack(SIGNED_PREFIX,
2530 self.failUnlessEqual(expected_prefix, prefix)
2531 self.failUnlessEqual(self.rref.read_count, 0)
2532 d.addCallback(_check_verinfo)
2533 # This shouldn't be enough to read any share data.
2534 d.addCallback(_make_mr, 123)
2535 d.addCallback(lambda mr:
2536 mr.get_block_and_salt(0))
2537 def _check_block_and_salt((block, salt)):
2538 self.failUnlessEqual(block, self.block * 6)
2539 self.failUnlessEqual(salt, self.salt)
2540 # TODO: Fix the read routine so that it reads only the data
2541 # that it has cached if it can't read all of it.
2542 self.failUnlessEqual(self.rref.read_count, 2)
2544 # This should be enough to read share data.
2545 d.addCallback(_make_mr, self.offsets['share_data'])
2546 d.addCallback(lambda mr:
2547 mr.get_block_and_salt(0))
2548 d.addCallback(_check_block_and_salt)
2552 def test_read_with_empty_mdmf_file(self):
2553 # Some tests upload a file with no contents to test things
2554 # unrelated to the actual handling of the content of the file.
2555 # The reader should behave intelligently in these cases.
2556 self.write_test_share_to_server("si1", empty=True)
2557 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2558 # We should be able to get the encoding parameters, and they
2559 # should be correct.
2560 d = defer.succeed(None)
2561 d.addCallback(lambda ignored:
2562 mr.get_encoding_parameters())
2563 def _check_encoding_parameters(params):
2564 self.failUnlessEqual(len(params), 4)
2565 k, n, segsize, datalen = params
2566 self.failUnlessEqual(k, 3)
2567 self.failUnlessEqual(n, 10)
2568 self.failUnlessEqual(segsize, 0)
2569 self.failUnlessEqual(datalen, 0)
2570 d.addCallback(_check_encoding_parameters)
2572 # We should not be able to fetch a block, since there are no
2574 d.addCallback(lambda ignored:
2575 self.shouldFail(LayoutInvalid, "get block on empty file",
2577 mr.get_block_and_salt, 0))
2581 def test_read_with_empty_sdmf_file(self):
2582 self.write_sdmf_share_to_server("si1", empty=True)
2583 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2584 # We should be able to get the encoding parameters, and they
2586 d = defer.succeed(None)
2587 d.addCallback(lambda ignored:
2588 mr.get_encoding_parameters())
2589 def _check_encoding_parameters(params):
2590 self.failUnlessEqual(len(params), 4)
2591 k, n, segsize, datalen = params
2592 self.failUnlessEqual(k, 3)
2593 self.failUnlessEqual(n, 10)
2594 self.failUnlessEqual(segsize, 0)
2595 self.failUnlessEqual(datalen, 0)
2596 d.addCallback(_check_encoding_parameters)
2598 # It does not make sense to get a block in this format, so we
2599 # should not be able to.
2600 d.addCallback(lambda ignored:
2601 self.shouldFail(LayoutInvalid, "get block on an empty file",
2603 mr.get_block_and_salt, 0))
2607 def test_verinfo_with_sdmf_file(self):
2608 self.write_sdmf_share_to_server("si1")
2609 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2610 # We should be able to get the version information.
2611 d = defer.succeed(None)
2612 d.addCallback(lambda ignored:
2614 def _check_verinfo(verinfo):
2615 self.failUnless(verinfo)
2616 self.failUnlessEqual(len(verinfo), 9)
2626 self.failUnlessEqual(seqnum, 0)
2627 self.failUnlessEqual(root_hash, self.root_hash)
2628 self.failUnlessEqual(salt, self.salt)
2629 self.failUnlessEqual(segsize, 36)
2630 self.failUnlessEqual(datalen, 36)
2631 self.failUnlessEqual(k, 3)
2632 self.failUnlessEqual(n, 10)
2633 expected_prefix = struct.pack(">BQ32s16s BBQQ",
2642 self.failUnlessEqual(prefix, expected_prefix)
2643 self.failUnlessEqual(offsets, self.offsets)
2644 d.addCallback(_check_verinfo)
2648 def test_verinfo_with_mdmf_file(self):
2649 self.write_test_share_to_server("si1")
2650 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2651 d = defer.succeed(None)
2652 d.addCallback(lambda ignored:
2654 def _check_verinfo(verinfo):
2655 self.failUnless(verinfo)
2656 self.failUnlessEqual(len(verinfo), 9)
2666 self.failUnlessEqual(seqnum, 0)
2667 self.failUnlessEqual(root_hash, self.root_hash)
2669 self.failUnlessEqual(segsize, 6)
2670 self.failUnlessEqual(datalen, 36)
2671 self.failUnlessEqual(k, 3)
2672 self.failUnlessEqual(n, 10)
2673 expected_prefix = struct.pack(">BQ32s BBQQ",
2681 self.failUnlessEqual(prefix, expected_prefix)
2682 self.failUnlessEqual(offsets, self.offsets)
2683 d.addCallback(_check_verinfo)
2687 def test_sdmf_writer(self):
2688 # Go through the motions of writing an SDMF share to the storage
2689 # server. Then read the storage server to see that the share got
2690 # written in the way that we think it should have.
2692 # We do this first so that the necessary instance variables get
2693 # set the way we want them for the tests below.
2694 data = self.build_test_sdmf_share()
2695 sdmfr = SDMFSlotWriteProxy(0,
2700 # Put the block and salt.
2701 sdmfr.put_block(self.blockdata, 0, self.salt)
2703 # Put the encprivkey
2704 sdmfr.put_encprivkey(self.encprivkey)
2706 # Put the block and share hash chains
2707 sdmfr.put_blockhashes(self.block_hash_tree)
2708 sdmfr.put_sharehashes(self.share_hash_chain)
2709 sdmfr.put_root_hash(self.root_hash)
2712 sdmfr.put_signature(self.signature)
2714 # Put the verification key
2715 sdmfr.put_verification_key(self.verification_key)
2717 # Now check to make sure that nothing has been written yet.
2718 self.failUnlessEqual(self.rref.write_count, 0)
2720 # Now finish publishing
2721 d = sdmfr.finish_publishing()
2723 self.failUnlessEqual(self.rref.write_count, 1)
2724 read = self.ss.remote_slot_readv
2725 self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2727 d.addCallback(_then)
2731 def test_sdmf_writer_preexisting_share(self):
2732 data = self.build_test_sdmf_share()
2733 self.write_sdmf_share_to_server("si1")
2735 # Now there is a share on the storage server. To successfully
2736 # write, we need to set the checkstring correctly. When we
2737 # don't, no write should occur.
2738 sdmfw = SDMFSlotWriteProxy(0,
2743 sdmfw.put_block(self.blockdata, 0, self.salt)
2745 # Put the encprivkey
2746 sdmfw.put_encprivkey(self.encprivkey)
2748 # Put the block and share hash chains
2749 sdmfw.put_blockhashes(self.block_hash_tree)
2750 sdmfw.put_sharehashes(self.share_hash_chain)
2753 sdmfw.put_root_hash(self.root_hash)
2756 sdmfw.put_signature(self.signature)
2758 # Put the verification key
2759 sdmfw.put_verification_key(self.verification_key)
2761 # We shouldn't have a checkstring yet
2762 self.failUnlessEqual(sdmfw.get_checkstring(), "")
2764 d = sdmfw.finish_publishing()
2766 self.failIf(results[0])
2767 # this is the correct checkstring
2768 self._expected_checkstring = results[1][0][0]
2769 return self._expected_checkstring
2771 d.addCallback(_then)
2772 d.addCallback(sdmfw.set_checkstring)
2773 d.addCallback(lambda ignored:
2774 sdmfw.get_checkstring())
2775 d.addCallback(lambda checkstring:
2776 self.failUnlessEqual(checkstring, self._expected_checkstring))
2777 d.addCallback(lambda ignored:
2778 sdmfw.finish_publishing())
2779 def _then_again(results):
2780 self.failUnless(results[0])
2781 read = self.ss.remote_slot_readv
2782 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2783 {0: [struct.pack(">Q", 1)]})
2784 self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2786 d.addCallback(_then_again)
2790 class Stats(unittest.TestCase):
2793 self.sparent = LoggingServiceParent()
2794 self._lease_secret = itertools.count()
2796 return self.sparent.stopService()
2798 def workdir(self, name):
2799 basedir = os.path.join("storage", "Server", name)
2802 def create(self, name):
2803 workdir = self.workdir(name)
2804 ss = StorageServer(workdir, "\x00" * 20)
2805 ss.setServiceParent(self.sparent)
2808 def test_latencies(self):
2809 ss = self.create("test_latencies")
2810 for i in range(10000):
2811 ss.add_latency("allocate", 1.0 * i)
2812 for i in range(1000):
2813 ss.add_latency("renew", 1.0 * i)
2815 ss.add_latency("write", 1.0 * i)
2817 ss.add_latency("cancel", 2.0 * i)
2818 ss.add_latency("get", 5.0)
2820 output = ss.get_latencies()
2822 self.failUnlessEqual(sorted(output.keys()),
2823 sorted(["allocate", "renew", "cancel", "write", "get"]))
2824 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2825 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2826 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2827 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2828 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2829 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2830 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2831 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2832 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2834 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2835 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2836 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
2837 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2838 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2839 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2840 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2841 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2842 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2844 self.failUnlessEqual(len(ss.latencies["write"]), 20)
2845 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2846 self.failUnless(output["write"]["01_0_percentile"] is None, output)
2847 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
2848 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2849 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2850 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2851 self.failUnless(output["write"]["99_0_percentile"] is None, output)
2852 self.failUnless(output["write"]["99_9_percentile"] is None, output)
2854 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2855 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2856 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2857 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
2858 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2859 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2860 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2861 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2862 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2864 self.failUnlessEqual(len(ss.latencies["get"]), 1)
2865 self.failUnless(output["get"]["mean"] is None, output)
2866 self.failUnless(output["get"]["01_0_percentile"] is None, output)
2867 self.failUnless(output["get"]["10_0_percentile"] is None, output)
2868 self.failUnless(output["get"]["50_0_percentile"] is None, output)
2869 self.failUnless(output["get"]["90_0_percentile"] is None, output)
2870 self.failUnless(output["get"]["95_0_percentile"] is None, output)
2871 self.failUnless(output["get"]["99_0_percentile"] is None, output)
2872 self.failUnless(output["get"]["99_9_percentile"] is None, output)
2875 s = re.sub(r'<[^>]*>', ' ', s)
2876 s = re.sub(r'\s+', ' ', s)
2879 class MyBucketCountingCrawler(BucketCountingCrawler):
2880 def finished_prefix(self, cycle, prefix):
2881 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2883 d = self.hook_ds.pop(0)
2886 class MyStorageServer(StorageServer):
2887 def add_bucket_counter(self):
2888 statefile = os.path.join(self.storedir, "bucket_counter.state")
2889 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2890 self.bucket_counter.setServiceParent(self)
2892 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2895 self.s = service.MultiService()
2896 self.s.startService()
2898 return self.s.stopService()
2900 def test_bucket_counter(self):
2901 basedir = "storage/BucketCounter/bucket_counter"
2902 fileutil.make_dirs(basedir)
2903 ss = StorageServer(basedir, "\x00" * 20)
2904 # to make sure we capture the bucket-counting-crawler in the middle
2905 # of a cycle, we reach in and reduce its maximum slice time to 0. We
2906 # also make it start sooner than usual.
2907 ss.bucket_counter.slow_start = 0
2908 orig_cpu_slice = ss.bucket_counter.cpu_slice
2909 ss.bucket_counter.cpu_slice = 0
2910 ss.setServiceParent(self.s)
2912 w = StorageStatus(ss)
2914 # this sample is before the crawler has started doing anything
2915 html = w.renderSynchronously()
2916 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2917 s = remove_tags(html)
2918 self.failUnlessIn("Accepting new shares: Yes", s)
2919 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2920 self.failUnlessIn("Total buckets: Not computed yet", s)
2921 self.failUnlessIn("Next crawl in", s)
2923 # give the bucket-counting-crawler one tick to get started. The
2924 # cpu_slice=0 will force it to yield right after it processes the
2927 d = fireEventually()
2928 def _check(ignored):
2929 # are we really right after the first prefix?
2930 state = ss.bucket_counter.get_state()
2931 if state["last-complete-prefix"] is None:
2932 d2 = fireEventually()
2933 d2.addCallback(_check)
2935 self.failUnlessEqual(state["last-complete-prefix"],
2936 ss.bucket_counter.prefixes[0])
2937 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2938 html = w.renderSynchronously()
2939 s = remove_tags(html)
2940 self.failUnlessIn(" Current crawl ", s)
2941 self.failUnlessIn(" (next work in ", s)
2942 d.addCallback(_check)
2944 # now give it enough time to complete a full cycle
2946 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2947 d.addCallback(lambda ignored: self.poll(_watch))
2948 def _check2(ignored):
2949 ss.bucket_counter.cpu_slice = orig_cpu_slice
2950 html = w.renderSynchronously()
2951 s = remove_tags(html)
2952 self.failUnlessIn("Total buckets: 0 (the number of", s)
2953 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2954 d.addCallback(_check2)
2957 def test_bucket_counter_cleanup(self):
2958 basedir = "storage/BucketCounter/bucket_counter_cleanup"
2959 fileutil.make_dirs(basedir)
2960 ss = StorageServer(basedir, "\x00" * 20)
2961 # to make sure we capture the bucket-counting-crawler in the middle
2962 # of a cycle, we reach in and reduce its maximum slice time to 0.
2963 ss.bucket_counter.slow_start = 0
2964 orig_cpu_slice = ss.bucket_counter.cpu_slice
2965 ss.bucket_counter.cpu_slice = 0
2966 ss.setServiceParent(self.s)
2968 d = fireEventually()
2970 def _after_first_prefix(ignored):
2971 state = ss.bucket_counter.state
2972 if state["last-complete-prefix"] is None:
2973 d2 = fireEventually()
2974 d2.addCallback(_after_first_prefix)
2976 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2977 # now sneak in and mess with its state, to make sure it cleans up
2978 # properly at the end of the cycle
2979 self.failUnlessEqual(state["last-complete-prefix"],
2980 ss.bucket_counter.prefixes[0])
2981 state["bucket-counts"][-12] = {}
2982 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2983 ss.bucket_counter.save_state()
2984 d.addCallback(_after_first_prefix)
2986 # now give it enough time to complete a cycle
2988 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2989 d.addCallback(lambda ignored: self.poll(_watch))
2990 def _check2(ignored):
2991 ss.bucket_counter.cpu_slice = orig_cpu_slice
2992 s = ss.bucket_counter.get_state()
2993 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2994 self.failIf("bogusprefix!" in s["storage-index-samples"],
2995 s["storage-index-samples"].keys())
2996 d.addCallback(_check2)
2999 def test_bucket_counter_eta(self):
3000 basedir = "storage/BucketCounter/bucket_counter_eta"
3001 fileutil.make_dirs(basedir)
3002 ss = MyStorageServer(basedir, "\x00" * 20)
3003 ss.bucket_counter.slow_start = 0
3004 # these will be fired inside finished_prefix()
3005 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
3006 w = StorageStatus(ss)
3008 d = defer.Deferred()
3010 def _check_1(ignored):
3011 # no ETA is available yet
3012 html = w.renderSynchronously()
3013 s = remove_tags(html)
3014 self.failUnlessIn("complete (next work", s)
3016 def _check_2(ignored):
3017 # one prefix has finished, so an ETA based upon that elapsed time
3018 # should be available.
3019 html = w.renderSynchronously()
3020 s = remove_tags(html)
3021 self.failUnlessIn("complete (ETA ", s)
3023 def _check_3(ignored):
3024 # two prefixes have finished
3025 html = w.renderSynchronously()
3026 s = remove_tags(html)
3027 self.failUnlessIn("complete (ETA ", s)
3030 hooks[0].addCallback(_check_1).addErrback(d.errback)
3031 hooks[1].addCallback(_check_2).addErrback(d.errback)
3032 hooks[2].addCallback(_check_3).addErrback(d.errback)
3034 ss.setServiceParent(self.s)
3037 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3038 stop_after_first_bucket = False
3039 def process_bucket(self, *args, **kwargs):
3040 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3041 if self.stop_after_first_bucket:
3042 self.stop_after_first_bucket = False
3043 self.cpu_slice = -1.0
3044 def yielding(self, sleep_time):
3045 if not self.stop_after_first_bucket:
3046 self.cpu_slice = 500
3048 class BrokenStatResults:
3050 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3053 bsr = BrokenStatResults()
3054 for attrname in dir(s):
3055 if attrname.startswith("_"):
3057 if attrname == "st_blocks":
3059 setattr(bsr, attrname, getattr(s, attrname))
3062 class InstrumentedStorageServer(StorageServer):
3063 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3064 class No_ST_BLOCKS_StorageServer(StorageServer):
3065 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3067 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3070 self.s = service.MultiService()
3071 self.s.startService()
3073 return self.s.stopService()
3075 def make_shares(self, ss):
3077 return (si, hashutil.tagged_hash("renew", si),
3078 hashutil.tagged_hash("cancel", si))
3079 def make_mutable(si):
3080 return (si, hashutil.tagged_hash("renew", si),
3081 hashutil.tagged_hash("cancel", si),
3082 hashutil.tagged_hash("write-enabler", si))
3083 def make_extra_lease(si, num):
3084 return (hashutil.tagged_hash("renew-%d" % num, si),
3085 hashutil.tagged_hash("cancel-%d" % num, si))
3087 immutable_si_0, rs0, cs0 = make("\x00" * 16)
3088 immutable_si_1, rs1, cs1 = make("\x01" * 16)
3089 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3090 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3091 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3092 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3094 canary = FakeCanary()
3095 # note: 'tahoe debug dump-share' will not handle this file, since the
3096 # inner contents are not a valid CHK share
3097 data = "\xff" * 1000
3099 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3101 w[0].remote_write(0, data)
3104 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3106 w[0].remote_write(0, data)
3108 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3110 writev = ss.remote_slot_testv_and_readv_and_writev
3111 writev(mutable_si_2, (we2, rs2, cs2),
3112 {0: ([], [(0,data)], len(data))}, [])
3113 writev(mutable_si_3, (we3, rs3, cs3),
3114 {0: ([], [(0,data)], len(data))}, [])
3115 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3117 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3118 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3119 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3121 def test_basic(self):
3122 basedir = "storage/LeaseCrawler/basic"
3123 fileutil.make_dirs(basedir)
3124 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3125 # make it start sooner than usual.
3126 lc = ss.lease_checker
3129 lc.stop_after_first_bucket = True
3130 webstatus = StorageStatus(ss)
3132 # create a few shares, with some leases on them
3133 self.make_shares(ss)
3134 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3136 # add a non-sharefile to exercise another code path
3137 fn = os.path.join(ss.sharedir,
3138 storage_index_to_dir(immutable_si_0),
3141 f.write("I am not a share.\n")
3144 # this is before the crawl has started, so we're not in a cycle yet
3145 initial_state = lc.get_state()
3146 self.failIf(lc.get_progress()["cycle-in-progress"])
3147 self.failIfIn("cycle-to-date", initial_state)
3148 self.failIfIn("estimated-remaining-cycle", initial_state)
3149 self.failIfIn("estimated-current-cycle", initial_state)
3150 self.failUnlessIn("history", initial_state)
3151 self.failUnlessEqual(initial_state["history"], {})
3153 ss.setServiceParent(self.s)
3157 d = fireEventually()
3159 # now examine the state right after the first bucket has been
3161 def _after_first_bucket(ignored):
3162 initial_state = lc.get_state()
3163 if "cycle-to-date" not in initial_state:
3164 d2 = fireEventually()
3165 d2.addCallback(_after_first_bucket)
3167 self.failUnlessIn("cycle-to-date", initial_state)
3168 self.failUnlessIn("estimated-remaining-cycle", initial_state)
3169 self.failUnlessIn("estimated-current-cycle", initial_state)
3170 self.failUnlessIn("history", initial_state)
3171 self.failUnlessEqual(initial_state["history"], {})
3173 so_far = initial_state["cycle-to-date"]
3174 self.failUnlessEqual(so_far["expiration-enabled"], False)
3175 self.failUnlessIn("configured-expiration-mode", so_far)
3176 self.failUnlessIn("lease-age-histogram", so_far)
3177 lah = so_far["lease-age-histogram"]
3178 self.failUnlessEqual(type(lah), list)
3179 self.failUnlessEqual(len(lah), 1)
3180 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3181 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3182 self.failUnlessEqual(so_far["corrupt-shares"], [])
3183 sr1 = so_far["space-recovered"]
3184 self.failUnlessEqual(sr1["examined-buckets"], 1)
3185 self.failUnlessEqual(sr1["examined-shares"], 1)
3186 self.failUnlessEqual(sr1["actual-shares"], 0)
3187 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3188 self.failUnlessEqual(sr1["original-sharebytes"], 0)
3189 left = initial_state["estimated-remaining-cycle"]
3190 sr2 = left["space-recovered"]
3191 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3192 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3193 self.failIfEqual(sr2["actual-shares"], None)
3194 self.failIfEqual(sr2["configured-diskbytes"], None)
3195 self.failIfEqual(sr2["original-sharebytes"], None)
3196 d.addCallback(_after_first_bucket)
3197 d.addCallback(lambda ign: self.render1(webstatus))
3198 def _check_html_in_cycle(html):
3199 s = remove_tags(html)
3200 self.failUnlessIn("So far, this cycle has examined "
3201 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3202 self.failUnlessIn("and has recovered: "
3203 "0 shares, 0 buckets (0 mutable / 0 immutable), "
3204 "0 B (0 B / 0 B)", s)
3205 self.failUnlessIn("If expiration were enabled, "
3206 "we would have recovered: "
3207 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3208 " 0 B (0 B / 0 B) by now", s)
3209 self.failUnlessIn("and the remainder of this cycle "
3210 "would probably recover: "
3211 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3212 " 0 B (0 B / 0 B)", s)
3213 self.failUnlessIn("and the whole cycle would probably recover: "
3214 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3215 " 0 B (0 B / 0 B)", s)
3216 self.failUnlessIn("if we were strictly using each lease's default "
3217 "31-day lease lifetime", s)
3218 self.failUnlessIn("this cycle would be expected to recover: ", s)
3219 d.addCallback(_check_html_in_cycle)
3221 # wait for the crawler to finish the first cycle. Nothing should have
3224 return bool(lc.get_state()["last-cycle-finished"] is not None)
3225 d.addCallback(lambda ign: self.poll(_wait))
3227 def _after_first_cycle(ignored):
3229 self.failIf("cycle-to-date" in s)
3230 self.failIf("estimated-remaining-cycle" in s)
3231 self.failIf("estimated-current-cycle" in s)
3232 last = s["history"][0]
3233 self.failUnlessIn("cycle-start-finish-times", last)
3234 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3235 self.failUnlessEqual(last["expiration-enabled"], False)
3236 self.failUnlessIn("configured-expiration-mode", last)
3238 self.failUnlessIn("lease-age-histogram", last)
3239 lah = last["lease-age-histogram"]
3240 self.failUnlessEqual(type(lah), list)
3241 self.failUnlessEqual(len(lah), 1)
3242 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3244 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3245 self.failUnlessEqual(last["corrupt-shares"], [])
3247 rec = last["space-recovered"]
3248 self.failUnlessEqual(rec["examined-buckets"], 4)
3249 self.failUnlessEqual(rec["examined-shares"], 4)
3250 self.failUnlessEqual(rec["actual-buckets"], 0)
3251 self.failUnlessEqual(rec["original-buckets"], 0)
3252 self.failUnlessEqual(rec["configured-buckets"], 0)
3253 self.failUnlessEqual(rec["actual-shares"], 0)
3254 self.failUnlessEqual(rec["original-shares"], 0)
3255 self.failUnlessEqual(rec["configured-shares"], 0)
3256 self.failUnlessEqual(rec["actual-diskbytes"], 0)
3257 self.failUnlessEqual(rec["original-diskbytes"], 0)
3258 self.failUnlessEqual(rec["configured-diskbytes"], 0)
3259 self.failUnlessEqual(rec["actual-sharebytes"], 0)
3260 self.failUnlessEqual(rec["original-sharebytes"], 0)
3261 self.failUnlessEqual(rec["configured-sharebytes"], 0)
3263 def _get_sharefile(si):
3264 return list(ss._iter_share_files(si))[0]
3265 def count_leases(si):
3266 return len(list(_get_sharefile(si).get_leases()))
3267 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3268 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3269 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3270 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3271 d.addCallback(_after_first_cycle)
3272 d.addCallback(lambda ign: self.render1(webstatus))
3273 def _check_html(html):
3274 s = remove_tags(html)
3275 self.failUnlessIn("recovered: 0 shares, 0 buckets "
3276 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3277 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3278 "(2 mutable / 2 immutable),", s)
3279 self.failUnlessIn("but expiration was not enabled", s)
3280 d.addCallback(_check_html)
3281 d.addCallback(lambda ign: self.render_json(webstatus))
3282 def _check_json(json):
3283 data = simplejson.loads(json)
3284 self.failUnlessIn("lease-checker", data)
3285 self.failUnlessIn("lease-checker-progress", data)
3286 d.addCallback(_check_json)
3289 def backdate_lease(self, sf, renew_secret, new_expire_time):
3290 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3291 # "renew" a lease with a new_expire_time that is older than what the
3292 # current lease has), so we have to reach inside it.
3293 for i,lease in enumerate(sf.get_leases()):
3294 if lease.renew_secret == renew_secret:
3295 lease.expiration_time = new_expire_time
3296 f = open(sf.home, 'rb+')
3297 sf._write_lease_record(f, i, lease)
3300 raise IndexError("unable to renew non-existent lease")
3302 def test_expire_age(self):
3303 basedir = "storage/LeaseCrawler/expire_age"
3304 fileutil.make_dirs(basedir)
3305 # setting expiration_time to 2000 means that any lease which is more
3306 # than 2000s old will be expired.
3307 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3308 expiration_enabled=True,
3309 expiration_mode="age",
3310 expiration_override_lease_duration=2000)
3311 # make it start sooner than usual.
3312 lc = ss.lease_checker
3314 lc.stop_after_first_bucket = True
3315 webstatus = StorageStatus(ss)
3317 # create a few shares, with some leases on them
3318 self.make_shares(ss)
3319 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3321 def count_shares(si):
3322 return len(list(ss._iter_share_files(si)))
3323 def _get_sharefile(si):
3324 return list(ss._iter_share_files(si))[0]
3325 def count_leases(si):
3326 return len(list(_get_sharefile(si).get_leases()))
3328 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3329 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3330 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3331 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3332 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3333 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3334 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3335 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3337 # artificially crank back the expiration time on the first lease of
3338 # each share, to make it look like it expired already (age=1000s).
3339 # Some shares have an extra lease which is set to expire at the
3340 # default time in 31 days from now (age=31days). We then run the
3341 # crawler, which will expire the first lease, making some shares get
3342 # deleted and others stay alive (with one remaining lease)
3345 sf0 = _get_sharefile(immutable_si_0)
3346 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3347 sf0_size = os.stat(sf0.home).st_size
3349 # immutable_si_1 gets an extra lease
3350 sf1 = _get_sharefile(immutable_si_1)
3351 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3353 sf2 = _get_sharefile(mutable_si_2)
3354 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3355 sf2_size = os.stat(sf2.home).st_size
3357 # mutable_si_3 gets an extra lease
3358 sf3 = _get_sharefile(mutable_si_3)
3359 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3361 ss.setServiceParent(self.s)
3363 d = fireEventually()
3364 # examine the state right after the first bucket has been processed
3365 def _after_first_bucket(ignored):
3366 p = lc.get_progress()
3367 if not p["cycle-in-progress"]:
3368 d2 = fireEventually()
3369 d2.addCallback(_after_first_bucket)
3371 d.addCallback(_after_first_bucket)
3372 d.addCallback(lambda ign: self.render1(webstatus))
3373 def _check_html_in_cycle(html):
3374 s = remove_tags(html)
3375 # the first bucket encountered gets deleted, and its prefix
3376 # happens to be about 1/5th of the way through the ring, so the
3377 # predictor thinks we'll have 5 shares and that we'll delete them
3378 # all. This part of the test depends upon the SIs landing right
3379 # where they do now.
3380 self.failUnlessIn("The remainder of this cycle is expected to "
3381 "recover: 4 shares, 4 buckets", s)
3382 self.failUnlessIn("The whole cycle is expected to examine "
3383 "5 shares in 5 buckets and to recover: "
3384 "5 shares, 5 buckets", s)
3385 d.addCallback(_check_html_in_cycle)
3387 # wait for the crawler to finish the first cycle. Two shares should
3390 return bool(lc.get_state()["last-cycle-finished"] is not None)
3391 d.addCallback(lambda ign: self.poll(_wait))
3393 def _after_first_cycle(ignored):
3394 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3395 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3396 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3397 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3398 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3399 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3402 last = s["history"][0]
3404 self.failUnlessEqual(last["expiration-enabled"], True)
3405 self.failUnlessEqual(last["configured-expiration-mode"],
3406 ("age", 2000, None, ("mutable", "immutable")))
3407 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3409 rec = last["space-recovered"]
3410 self.failUnlessEqual(rec["examined-buckets"], 4)
3411 self.failUnlessEqual(rec["examined-shares"], 4)
3412 self.failUnlessEqual(rec["actual-buckets"], 2)
3413 self.failUnlessEqual(rec["original-buckets"], 2)
3414 self.failUnlessEqual(rec["configured-buckets"], 2)
3415 self.failUnlessEqual(rec["actual-shares"], 2)
3416 self.failUnlessEqual(rec["original-shares"], 2)
3417 self.failUnlessEqual(rec["configured-shares"], 2)
3418 size = sf0_size + sf2_size
3419 self.failUnlessEqual(rec["actual-sharebytes"], size)
3420 self.failUnlessEqual(rec["original-sharebytes"], size)
3421 self.failUnlessEqual(rec["configured-sharebytes"], size)
3422 # different platforms have different notions of "blocks used by
3423 # this file", so merely assert that it's a number
3424 self.failUnless(rec["actual-diskbytes"] >= 0,
3425 rec["actual-diskbytes"])
3426 self.failUnless(rec["original-diskbytes"] >= 0,
3427 rec["original-diskbytes"])
3428 self.failUnless(rec["configured-diskbytes"] >= 0,
3429 rec["configured-diskbytes"])
3430 d.addCallback(_after_first_cycle)
3431 d.addCallback(lambda ign: self.render1(webstatus))
3432 def _check_html(html):
3433 s = remove_tags(html)
3434 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3435 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3436 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3437 d.addCallback(_check_html)
3440 def test_expire_cutoff_date(self):
3441 basedir = "storage/LeaseCrawler/expire_cutoff_date"
3442 fileutil.make_dirs(basedir)
3443 # setting cutoff-date to 2000 seconds ago means that any lease which
3444 # is more than 2000s old will be expired.
3446 then = int(now - 2000)
3447 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3448 expiration_enabled=True,
3449 expiration_mode="cutoff-date",
3450 expiration_cutoff_date=then)
3451 # make it start sooner than usual.
3452 lc = ss.lease_checker
3454 lc.stop_after_first_bucket = True
3455 webstatus = StorageStatus(ss)
3457 # create a few shares, with some leases on them
3458 self.make_shares(ss)
3459 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3461 def count_shares(si):
3462 return len(list(ss._iter_share_files(si)))
3463 def _get_sharefile(si):
3464 return list(ss._iter_share_files(si))[0]
3465 def count_leases(si):
3466 return len(list(_get_sharefile(si).get_leases()))
3468 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3469 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3470 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3471 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3472 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3473 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3474 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3475 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3477 # artificially crank back the expiration time on the first lease of
3478 # each share, to make it look like was renewed 3000s ago. To achieve
3479 # this, we need to set the expiration time to now-3000+31days. This
3480 # will change when the lease format is improved to contain both
3481 # create/renew time and duration.
3482 new_expiration_time = now - 3000 + 31*24*60*60
3484 # Some shares have an extra lease which is set to expire at the
3485 # default time in 31 days from now (age=31days). We then run the
3486 # crawler, which will expire the first lease, making some shares get
3487 # deleted and others stay alive (with one remaining lease)
3489 sf0 = _get_sharefile(immutable_si_0)
3490 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3491 sf0_size = os.stat(sf0.home).st_size
3493 # immutable_si_1 gets an extra lease
3494 sf1 = _get_sharefile(immutable_si_1)
3495 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3497 sf2 = _get_sharefile(mutable_si_2)
3498 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3499 sf2_size = os.stat(sf2.home).st_size
3501 # mutable_si_3 gets an extra lease
3502 sf3 = _get_sharefile(mutable_si_3)
3503 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3505 ss.setServiceParent(self.s)
3507 d = fireEventually()
3508 # examine the state right after the first bucket has been processed
3509 def _after_first_bucket(ignored):
3510 p = lc.get_progress()
3511 if not p["cycle-in-progress"]:
3512 d2 = fireEventually()
3513 d2.addCallback(_after_first_bucket)
3515 d.addCallback(_after_first_bucket)
3516 d.addCallback(lambda ign: self.render1(webstatus))
3517 def _check_html_in_cycle(html):
3518 s = remove_tags(html)
3519 # the first bucket encountered gets deleted, and its prefix
3520 # happens to be about 1/5th of the way through the ring, so the
3521 # predictor thinks we'll have 5 shares and that we'll delete them
3522 # all. This part of the test depends upon the SIs landing right
3523 # where they do now.
3524 self.failUnlessIn("The remainder of this cycle is expected to "
3525 "recover: 4 shares, 4 buckets", s)
3526 self.failUnlessIn("The whole cycle is expected to examine "
3527 "5 shares in 5 buckets and to recover: "
3528 "5 shares, 5 buckets", s)
3529 d.addCallback(_check_html_in_cycle)
3531 # wait for the crawler to finish the first cycle. Two shares should
3534 return bool(lc.get_state()["last-cycle-finished"] is not None)
3535 d.addCallback(lambda ign: self.poll(_wait))
3537 def _after_first_cycle(ignored):
3538 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3539 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3540 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3541 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3542 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3543 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3546 last = s["history"][0]
3548 self.failUnlessEqual(last["expiration-enabled"], True)
3549 self.failUnlessEqual(last["configured-expiration-mode"],
3550 ("cutoff-date", None, then,
3551 ("mutable", "immutable")))
3552 self.failUnlessEqual(last["leases-per-share-histogram"],
3555 rec = last["space-recovered"]
3556 self.failUnlessEqual(rec["examined-buckets"], 4)
3557 self.failUnlessEqual(rec["examined-shares"], 4)
3558 self.failUnlessEqual(rec["actual-buckets"], 2)
3559 self.failUnlessEqual(rec["original-buckets"], 0)
3560 self.failUnlessEqual(rec["configured-buckets"], 2)
3561 self.failUnlessEqual(rec["actual-shares"], 2)
3562 self.failUnlessEqual(rec["original-shares"], 0)
3563 self.failUnlessEqual(rec["configured-shares"], 2)
3564 size = sf0_size + sf2_size
3565 self.failUnlessEqual(rec["actual-sharebytes"], size)
3566 self.failUnlessEqual(rec["original-sharebytes"], 0)
3567 self.failUnlessEqual(rec["configured-sharebytes"], size)
3568 # different platforms have different notions of "blocks used by
3569 # this file", so merely assert that it's a number
3570 self.failUnless(rec["actual-diskbytes"] >= 0,
3571 rec["actual-diskbytes"])
3572 self.failUnless(rec["original-diskbytes"] >= 0,
3573 rec["original-diskbytes"])
3574 self.failUnless(rec["configured-diskbytes"] >= 0,
3575 rec["configured-diskbytes"])
3576 d.addCallback(_after_first_cycle)
3577 d.addCallback(lambda ign: self.render1(webstatus))
3578 def _check_html(html):
3579 s = remove_tags(html)
3580 self.failUnlessIn("Expiration Enabled:"
3581 " expired leases will be removed", s)
3582 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3583 substr = "Leases created or last renewed before %s will be considered expired." % date
3584 self.failUnlessIn(substr, s)
3585 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3586 d.addCallback(_check_html)
3589 def test_only_immutable(self):
3590 basedir = "storage/LeaseCrawler/only_immutable"
3591 fileutil.make_dirs(basedir)
3593 then = int(now - 2000)
3594 ss = StorageServer(basedir, "\x00" * 20,
3595 expiration_enabled=True,
3596 expiration_mode="cutoff-date",
3597 expiration_cutoff_date=then,
3598 expiration_sharetypes=("immutable",))
3599 lc = ss.lease_checker
3601 webstatus = StorageStatus(ss)
3603 self.make_shares(ss)
3604 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3605 # set all leases to be expirable
3606 new_expiration_time = now - 3000 + 31*24*60*60
3608 def count_shares(si):
3609 return len(list(ss._iter_share_files(si)))
3610 def _get_sharefile(si):
3611 return list(ss._iter_share_files(si))[0]
3612 def count_leases(si):
3613 return len(list(_get_sharefile(si).get_leases()))
3615 sf0 = _get_sharefile(immutable_si_0)
3616 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3617 sf1 = _get_sharefile(immutable_si_1)
3618 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3619 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3620 sf2 = _get_sharefile(mutable_si_2)
3621 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3622 sf3 = _get_sharefile(mutable_si_3)
3623 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3624 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3626 ss.setServiceParent(self.s)
3628 return bool(lc.get_state()["last-cycle-finished"] is not None)
3629 d = self.poll(_wait)
3631 def _after_first_cycle(ignored):
3632 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3633 self.failUnlessEqual(count_shares(immutable_si_1), 0)
3634 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3635 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3636 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3637 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3638 d.addCallback(_after_first_cycle)
3639 d.addCallback(lambda ign: self.render1(webstatus))
3640 def _check_html(html):
3641 s = remove_tags(html)
3642 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3643 d.addCallback(_check_html)
3646 def test_only_mutable(self):
3647 basedir = "storage/LeaseCrawler/only_mutable"
3648 fileutil.make_dirs(basedir)
3650 then = int(now - 2000)
3651 ss = StorageServer(basedir, "\x00" * 20,
3652 expiration_enabled=True,
3653 expiration_mode="cutoff-date",
3654 expiration_cutoff_date=then,
3655 expiration_sharetypes=("mutable",))
3656 lc = ss.lease_checker
3658 webstatus = StorageStatus(ss)
3660 self.make_shares(ss)
3661 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3662 # set all leases to be expirable
3663 new_expiration_time = now - 3000 + 31*24*60*60
3665 def count_shares(si):
3666 return len(list(ss._iter_share_files(si)))
3667 def _get_sharefile(si):
3668 return list(ss._iter_share_files(si))[0]
3669 def count_leases(si):
3670 return len(list(_get_sharefile(si).get_leases()))
3672 sf0 = _get_sharefile(immutable_si_0)
3673 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3674 sf1 = _get_sharefile(immutable_si_1)
3675 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3676 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3677 sf2 = _get_sharefile(mutable_si_2)
3678 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3679 sf3 = _get_sharefile(mutable_si_3)
3680 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3681 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3683 ss.setServiceParent(self.s)
3685 return bool(lc.get_state()["last-cycle-finished"] is not None)
3686 d = self.poll(_wait)
3688 def _after_first_cycle(ignored):
3689 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3690 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3691 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3692 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3693 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3694 self.failUnlessEqual(count_shares(mutable_si_3), 0)
3695 d.addCallback(_after_first_cycle)
3696 d.addCallback(lambda ign: self.render1(webstatus))
3697 def _check_html(html):
3698 s = remove_tags(html)
3699 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3700 d.addCallback(_check_html)
3703 def test_bad_mode(self):
3704 basedir = "storage/LeaseCrawler/bad_mode"
3705 fileutil.make_dirs(basedir)
3706 e = self.failUnlessRaises(ValueError,
3707 StorageServer, basedir, "\x00" * 20,
3708 expiration_mode="bogus")
3709 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3711 def test_parse_duration(self):
3715 p = time_format.parse_duration
3716 self.failUnlessEqual(p("7days"), 7*DAY)
3717 self.failUnlessEqual(p("31day"), 31*DAY)
3718 self.failUnlessEqual(p("60 days"), 60*DAY)
3719 self.failUnlessEqual(p("2mo"), 2*MONTH)
3720 self.failUnlessEqual(p("3 month"), 3*MONTH)
3721 self.failUnlessEqual(p("2years"), 2*YEAR)
3722 e = self.failUnlessRaises(ValueError, p, "2kumquats")
3723 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3725 def test_parse_date(self):
3726 p = time_format.parse_date
3727 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3728 self.failUnlessEqual(p("2009-03-18"), 1237334400)
3730 def test_limited_history(self):
3731 basedir = "storage/LeaseCrawler/limited_history"
3732 fileutil.make_dirs(basedir)
3733 ss = StorageServer(basedir, "\x00" * 20)
3734 # make it start sooner than usual.
3735 lc = ss.lease_checker
3739 # create a few shares, with some leases on them
3740 self.make_shares(ss)
3742 ss.setServiceParent(self.s)
3744 def _wait_until_15_cycles_done():
3745 last = lc.state["last-cycle-finished"]
3746 if last is not None and last >= 15:
3751 d = self.poll(_wait_until_15_cycles_done)
3753 def _check(ignored):
3756 self.failUnlessEqual(len(h), 10)
3757 self.failUnlessEqual(max(h.keys()), 15)
3758 self.failUnlessEqual(min(h.keys()), 6)
3759 d.addCallback(_check)
3762 def test_unpredictable_future(self):
3763 basedir = "storage/LeaseCrawler/unpredictable_future"
3764 fileutil.make_dirs(basedir)
3765 ss = StorageServer(basedir, "\x00" * 20)
3766 # make it start sooner than usual.
3767 lc = ss.lease_checker
3769 lc.cpu_slice = -1.0 # stop quickly
3771 self.make_shares(ss)
3773 ss.setServiceParent(self.s)
3775 d = fireEventually()
3776 def _check(ignored):
3777 # this should fire after the first bucket is complete, but before
3778 # the first prefix is complete, so the progress-measurer won't
3779 # think we've gotten far enough to raise our percent-complete
3780 # above 0%, triggering the cannot-predict-the-future code in
3781 # expirer.py . This will have to change if/when the
3782 # progress-measurer gets smart enough to count buckets (we'll
3783 # have to interrupt it even earlier, before it's finished the
3786 if "cycle-to-date" not in s:
3787 d2 = fireEventually()
3788 d2.addCallback(_check)
3790 self.failUnlessIn("cycle-to-date", s)
3791 self.failUnlessIn("estimated-remaining-cycle", s)
3792 self.failUnlessIn("estimated-current-cycle", s)
3794 left = s["estimated-remaining-cycle"]["space-recovered"]
3795 self.failUnlessEqual(left["actual-buckets"], None)
3796 self.failUnlessEqual(left["original-buckets"], None)
3797 self.failUnlessEqual(left["configured-buckets"], None)
3798 self.failUnlessEqual(left["actual-shares"], None)
3799 self.failUnlessEqual(left["original-shares"], None)
3800 self.failUnlessEqual(left["configured-shares"], None)
3801 self.failUnlessEqual(left["actual-diskbytes"], None)
3802 self.failUnlessEqual(left["original-diskbytes"], None)
3803 self.failUnlessEqual(left["configured-diskbytes"], None)
3804 self.failUnlessEqual(left["actual-sharebytes"], None)
3805 self.failUnlessEqual(left["original-sharebytes"], None)
3806 self.failUnlessEqual(left["configured-sharebytes"], None)
3808 full = s["estimated-remaining-cycle"]["space-recovered"]
3809 self.failUnlessEqual(full["actual-buckets"], None)
3810 self.failUnlessEqual(full["original-buckets"], None)
3811 self.failUnlessEqual(full["configured-buckets"], None)
3812 self.failUnlessEqual(full["actual-shares"], None)
3813 self.failUnlessEqual(full["original-shares"], None)
3814 self.failUnlessEqual(full["configured-shares"], None)
3815 self.failUnlessEqual(full["actual-diskbytes"], None)
3816 self.failUnlessEqual(full["original-diskbytes"], None)
3817 self.failUnlessEqual(full["configured-diskbytes"], None)
3818 self.failUnlessEqual(full["actual-sharebytes"], None)
3819 self.failUnlessEqual(full["original-sharebytes"], None)
3820 self.failUnlessEqual(full["configured-sharebytes"], None)
3822 d.addCallback(_check)
3825 def test_no_st_blocks(self):
3826 basedir = "storage/LeaseCrawler/no_st_blocks"
3827 fileutil.make_dirs(basedir)
3828 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3829 expiration_mode="age",
3830 expiration_override_lease_duration=-1000)
3831 # a negative expiration_time= means the "configured-"
3832 # space-recovered counts will be non-zero, since all shares will have
3835 # make it start sooner than usual.
3836 lc = ss.lease_checker
3839 self.make_shares(ss)
3840 ss.setServiceParent(self.s)
3842 return bool(lc.get_state()["last-cycle-finished"] is not None)
3843 d = self.poll(_wait)
3845 def _check(ignored):
3847 last = s["history"][0]
3848 rec = last["space-recovered"]
3849 self.failUnlessEqual(rec["configured-buckets"], 4)
3850 self.failUnlessEqual(rec["configured-shares"], 4)
3851 self.failUnless(rec["configured-sharebytes"] > 0,
3852 rec["configured-sharebytes"])
3853 # without the .st_blocks field in os.stat() results, we should be
3854 # reporting diskbytes==sharebytes
3855 self.failUnlessEqual(rec["configured-sharebytes"],
3856 rec["configured-diskbytes"])
3857 d.addCallback(_check)
3860 def test_share_corruption(self):
3861 self._poll_should_ignore_these_errors = [
3862 UnknownMutableContainerVersionError,
3863 UnknownImmutableContainerVersionError,
3865 basedir = "storage/LeaseCrawler/share_corruption"
3866 fileutil.make_dirs(basedir)
3867 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3868 w = StorageStatus(ss)
3869 # make it start sooner than usual.
3870 lc = ss.lease_checker
3871 lc.stop_after_first_bucket = True
3875 # create a few shares, with some leases on them
3876 self.make_shares(ss)
3878 # now corrupt one, and make sure the lease-checker keeps going
3879 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3880 first = min(self.sis)
3881 first_b32 = base32.b2a(first)
3882 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3885 f.write("BAD MAGIC")
3887 # if get_share_file() doesn't see the correct mutable magic, it
3888 # assumes the file is an immutable share, and then
3889 # immutable.ShareFile sees a bad version. So regardless of which kind
3890 # of share we corrupted, this will trigger an
3891 # UnknownImmutableContainerVersionError.
3893 # also create an empty bucket
3894 empty_si = base32.b2a("\x04"*16)
3895 empty_bucket_dir = os.path.join(ss.sharedir,
3896 storage_index_to_dir(empty_si))
3897 fileutil.make_dirs(empty_bucket_dir)
3899 ss.setServiceParent(self.s)
3901 d = fireEventually()
3903 # now examine the state right after the first bucket has been
3905 def _after_first_bucket(ignored):
3907 if "cycle-to-date" not in s:
3908 d2 = fireEventually()
3909 d2.addCallback(_after_first_bucket)
3911 so_far = s["cycle-to-date"]
3912 rec = so_far["space-recovered"]
3913 self.failUnlessEqual(rec["examined-buckets"], 1)
3914 self.failUnlessEqual(rec["examined-shares"], 0)
3915 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3916 d.addCallback(_after_first_bucket)
3918 d.addCallback(lambda ign: self.render_json(w))
3919 def _check_json(json):
3920 data = simplejson.loads(json)
3921 # grr. json turns all dict keys into strings.
3922 so_far = data["lease-checker"]["cycle-to-date"]
3923 corrupt_shares = so_far["corrupt-shares"]
3924 # it also turns all tuples into lists
3925 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3926 d.addCallback(_check_json)
3927 d.addCallback(lambda ign: self.render1(w))
3928 def _check_html(html):
3929 s = remove_tags(html)
3930 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3931 d.addCallback(_check_html)
3934 return bool(lc.get_state()["last-cycle-finished"] is not None)
3935 d.addCallback(lambda ign: self.poll(_wait))
3937 def _after_first_cycle(ignored):
3939 last = s["history"][0]
3940 rec = last["space-recovered"]
3941 self.failUnlessEqual(rec["examined-buckets"], 5)
3942 self.failUnlessEqual(rec["examined-shares"], 3)
3943 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3944 d.addCallback(_after_first_cycle)
3945 d.addCallback(lambda ign: self.render_json(w))
3946 def _check_json_history(json):
3947 data = simplejson.loads(json)
3948 last = data["lease-checker"]["history"]["0"]
3949 corrupt_shares = last["corrupt-shares"]
3950 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3951 d.addCallback(_check_json_history)
3952 d.addCallback(lambda ign: self.render1(w))
3953 def _check_html_history(html):
3954 s = remove_tags(html)
3955 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3956 d.addCallback(_check_html_history)
3959 self.flushLoggedErrors(UnknownMutableContainerVersionError,
3960 UnknownImmutableContainerVersionError)
3965 def render_json(self, page):
3966 d = self.render1(page, args={"t": ["json"]})
3969 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3972 self.s = service.MultiService()
3973 self.s.startService()
3975 return self.s.stopService()
3977 def test_no_server(self):
3978 w = StorageStatus(None)
3979 html = w.renderSynchronously()
3980 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3982 def test_status(self):
3983 basedir = "storage/WebStatus/status"
3984 fileutil.make_dirs(basedir)
3985 nodeid = "\x00" * 20
3986 ss = StorageServer(basedir, nodeid)
3987 ss.setServiceParent(self.s)
3988 w = StorageStatus(ss, "nickname")
3990 def _check_html(html):
3991 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3992 s = remove_tags(html)
3993 self.failUnlessIn("Server Nickname: nickname", s)
3994 self.failUnlessIn("Server Nodeid: %s" % base32.b2a(nodeid), s)
3995 self.failUnlessIn("Accepting new shares: Yes", s)
3996 self.failUnlessIn("Reserved space: - 0 B (0)", s)
3997 d.addCallback(_check_html)
3998 d.addCallback(lambda ign: self.render_json(w))
3999 def _check_json(json):
4000 data = simplejson.loads(json)
4002 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
4003 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
4004 self.failUnlessIn("bucket-counter", data)
4005 self.failUnlessIn("lease-checker", data)
4006 d.addCallback(_check_json)
4009 def render_json(self, page):
4010 d = self.render1(page, args={"t": ["json"]})
4013 def test_status_no_disk_stats(self):
4014 def call_get_disk_stats(whichdir, reserved_space=0):
4015 raise AttributeError()
4016 self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
4018 # Some platforms may have no disk stats API. Make sure the code can handle that
4019 # (test runs on all platforms).
4020 basedir = "storage/WebStatus/status_no_disk_stats"
4021 fileutil.make_dirs(basedir)
4022 ss = StorageServer(basedir, "\x00" * 20)
4023 ss.setServiceParent(self.s)
4024 w = StorageStatus(ss)
4025 html = w.renderSynchronously()
4026 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4027 s = remove_tags(html)
4028 self.failUnlessIn("Accepting new shares: Yes", s)
4029 self.failUnlessIn("Total disk space: ?", s)
4030 self.failUnlessIn("Space Available to Tahoe: ?", s)
4031 self.failUnless(ss.get_available_space() is None)
4033 def test_status_bad_disk_stats(self):
4034 def call_get_disk_stats(whichdir, reserved_space=0):
4036 self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
4038 # If the API to get disk stats exists but a call to it fails, then the status should
4039 # show that no shares will be accepted, and get_available_space() should be 0.
4040 basedir = "storage/WebStatus/status_bad_disk_stats"
4041 fileutil.make_dirs(basedir)
4042 ss = StorageServer(basedir, "\x00" * 20)
4043 ss.setServiceParent(self.s)
4044 w = StorageStatus(ss)
4045 html = w.renderSynchronously()
4046 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4047 s = remove_tags(html)
4048 self.failUnlessIn("Accepting new shares: No", s)
4049 self.failUnlessIn("Total disk space: ?", s)
4050 self.failUnlessIn("Space Available to Tahoe: ?", s)
4051 self.failUnlessEqual(ss.get_available_space(), 0)
4053 def test_status_right_disk_stats(self):
4056 free_for_root = 4*GB
4057 free_for_nonroot = 3*GB
4060 basedir = "storage/WebStatus/status_right_disk_stats"
4061 fileutil.make_dirs(basedir)
4062 ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved)
4063 expecteddir = ss.sharedir
4065 def call_get_disk_stats(whichdir, reserved_space=0):
4066 self.failUnlessEqual(whichdir, expecteddir)
4067 self.failUnlessEqual(reserved_space, reserved)
4068 used = total - free_for_root
4069 avail = max(free_for_nonroot - reserved_space, 0)
4072 'free_for_root': free_for_root,
4073 'free_for_nonroot': free_for_nonroot,
4077 self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
4079 ss.setServiceParent(self.s)
4080 w = StorageStatus(ss)
4081 html = w.renderSynchronously()
4083 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4084 s = remove_tags(html)
4085 self.failUnlessIn("Total disk space: 5.00 GB", s)
4086 self.failUnlessIn("Disk space used: - 1.00 GB", s)
4087 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4088 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4089 self.failUnlessIn("Reserved space: - 1.00 GB", s)
4090 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4091 self.failUnlessEqual(ss.get_available_space(), 2*GB)
4093 def test_readonly(self):
4094 basedir = "storage/WebStatus/readonly"
4095 fileutil.make_dirs(basedir)
4096 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4097 ss.setServiceParent(self.s)
4098 w = StorageStatus(ss)
4099 html = w.renderSynchronously()
4100 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4101 s = remove_tags(html)
4102 self.failUnlessIn("Accepting new shares: No", s)
4104 def test_reserved(self):
4105 basedir = "storage/WebStatus/reserved"
4106 fileutil.make_dirs(basedir)
4107 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4108 ss.setServiceParent(self.s)
4109 w = StorageStatus(ss)
4110 html = w.renderSynchronously()
4111 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4112 s = remove_tags(html)
4113 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4115 def test_huge_reserved(self):
4116 basedir = "storage/WebStatus/reserved"
4117 fileutil.make_dirs(basedir)
4118 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4119 ss.setServiceParent(self.s)
4120 w = StorageStatus(ss)
4121 html = w.renderSynchronously()
4122 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4123 s = remove_tags(html)
4124 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4126 def test_util(self):
4127 w = StorageStatus(None)
4128 self.failUnlessEqual(w.render_space(None, None), "?")
4129 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4130 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4131 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4132 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4133 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)