1 import time, os.path, platform, stat, re, simplejson, struct, shutil
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24 LayoutInvalid, MDMFSIGNABLEHEADER, \
25 SIGNED_PREFIX, MDMFHEADER, \
26 MDMFOFFSETS, SDMFSlotWriteProxy, \
29 VERIFICATION_KEY_SIZE, \
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
40 def __init__(self, ignore_disconnectors=False):
41 self.ignore = ignore_disconnectors
42 self.disconnectors = {}
43 def notifyOnDisconnect(self, f, *args, **kwargs):
47 self.disconnectors[m] = (f, args, kwargs)
49 def dontNotifyOnDisconnect(self, marker):
52 del self.disconnectors[marker]
54 class FakeStatsProvider:
55 def count(self, name, delta=1):
57 def register_producer(self, producer):
60 class Bucket(unittest.TestCase):
61 def make_workdir(self, name):
62 basedir = os.path.join("storage", "Bucket", name)
63 incoming = os.path.join(basedir, "tmp", "bucket")
64 final = os.path.join(basedir, "bucket")
65 fileutil.make_dirs(basedir)
66 fileutil.make_dirs(os.path.join(basedir, "tmp"))
67 return incoming, final
69 def bucket_writer_closed(self, bw, consumed):
71 def add_latency(self, category, latency):
73 def count(self, name, delta=1):
78 renew_secret = os.urandom(32)
79 cancel_secret = os.urandom(32)
80 expiration_time = time.time() + 5000
81 return LeaseInfo(owner_num, renew_secret, cancel_secret,
82 expiration_time, "\x00" * 20)
84 def test_create(self):
85 incoming, final = self.make_workdir("test_create")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*25)
91 bw.remote_write(75, "d"*7)
94 def test_readwrite(self):
95 incoming, final = self.make_workdir("test_readwrite")
96 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
98 bw.remote_write(0, "a"*25)
99 bw.remote_write(25, "b"*25)
100 bw.remote_write(50, "c"*7) # last block may be short
104 br = BucketReader(self, bw.finalhome)
105 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
109 def test_read_past_end_of_share_data(self):
110 # test vector for immutable files (hard-coded contents of an immutable share
113 # The following immutable share file content is identical to that
114 # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115 # with share data == 'a'. The total size of this content is 85
118 containerdata = struct.pack('>LLL', 1, 1, 1)
120 # A Tahoe-LAFS storage client would send as the share_data a
121 # complicated string involving hash trees and a URI Extension Block
122 # -- see allmydata/immutable/layout.py . This test, which is
123 # simulating a client, just sends 'a'.
126 ownernumber = struct.pack('>L', 0)
127 renewsecret = 'THIS LETS ME RENEW YOUR FILE....'
128 assert len(renewsecret) == 32
129 cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130 assert len(cancelsecret) == 32
131 expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
133 lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
135 share_file_data = containerdata + share_data + lease_data
137 incoming, final = self.make_workdir("test_read_past_end_of_share_data")
139 fileutil.write(final, share_file_data)
141 mockstorageserver = mock.Mock()
144 br = BucketReader(mockstorageserver, final)
146 self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
148 # Read past the end of share data to get the cancel secret.
149 read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
151 result_of_read = br.remote_read(0, read_length)
152 self.failUnlessEqual(result_of_read, share_data)
154 result_of_read = br.remote_read(0, len(share_data)+1)
155 self.failUnlessEqual(result_of_read, share_data)
163 def callRemote(self, methname, *args, **kwargs):
165 meth = getattr(self.target, "remote_" + methname)
166 return meth(*args, **kwargs)
168 if methname == "slot_readv":
170 if "writev" in methname:
171 self.write_count += 1
173 return defer.maybeDeferred(_call)
176 class BucketProxy(unittest.TestCase):
177 def make_bucket(self, name, size):
178 basedir = os.path.join("storage", "BucketProxy", name)
179 incoming = os.path.join(basedir, "tmp", "bucket")
180 final = os.path.join(basedir, "bucket")
181 fileutil.make_dirs(basedir)
182 fileutil.make_dirs(os.path.join(basedir, "tmp"))
183 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
189 def make_lease(self):
191 renew_secret = os.urandom(32)
192 cancel_secret = os.urandom(32)
193 expiration_time = time.time() + 5000
194 return LeaseInfo(owner_num, renew_secret, cancel_secret,
195 expiration_time, "\x00" * 20)
197 def bucket_writer_closed(self, bw, consumed):
199 def add_latency(self, category, latency):
201 def count(self, name, delta=1):
204 def test_create(self):
205 bw, rb, sharefname = self.make_bucket("test_create", 500)
206 bp = WriteBucketProxy(rb, None,
211 uri_extension_size_max=500)
212 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
214 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215 # Let's pretend each share has 100 bytes of data, and that there are
216 # 4 segments (25 bytes each), and 8 shares total. So the two
217 # per-segment merkle trees (crypttext_hash_tree,
218 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221 # long. That should make the whole share:
223 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
226 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
228 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
230 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
232 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
234 uri_extension = "s" + "E"*498 + "e"
236 bw, rb, sharefname = self.make_bucket(name, sharesize)
237 bp = wbp_class(rb, None,
242 uri_extension_size_max=len(uri_extension))
245 d.addCallback(lambda res: bp.put_block(0, "a"*25))
246 d.addCallback(lambda res: bp.put_block(1, "b"*25))
247 d.addCallback(lambda res: bp.put_block(2, "c"*25))
248 d.addCallback(lambda res: bp.put_block(3, "d"*20))
249 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253 d.addCallback(lambda res: bp.close())
255 # now read everything back
256 def _start_reading(res):
257 br = BucketReader(self, sharefname)
260 server = NoNetworkServer("abc", None)
261 rbp = rbp_class(rb, server, storage_index="")
262 self.failUnlessIn("to peer", repr(rbp))
263 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
265 d1 = rbp.get_block_data(0, 25, 25)
266 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
274 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275 d1.addCallback(lambda res:
276 self.failUnlessEqual(res, crypttext_hashes))
277 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279 d1.addCallback(lambda res: rbp.get_share_hashes())
280 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281 d1.addCallback(lambda res: rbp.get_uri_extension())
282 d1.addCallback(lambda res:
283 self.failUnlessEqual(res, uri_extension))
287 d.addCallback(_start_reading)
291 def test_readwrite_v1(self):
292 return self._do_test_readwrite("test_readwrite_v1",
293 0x24, WriteBucketProxy, ReadBucketProxy)
295 def test_readwrite_v2(self):
296 return self._do_test_readwrite("test_readwrite_v2",
297 0x44, WriteBucketProxy_v2, ReadBucketProxy)
299 class Server(unittest.TestCase):
302 self.sparent = LoggingServiceParent()
303 self.sparent.startService()
304 self._lease_secret = itertools.count()
306 return self.sparent.stopService()
308 def workdir(self, name):
309 basedir = os.path.join("storage", "Server", name)
312 def create(self, name, reserved_space=0, klass=StorageServer):
313 workdir = self.workdir(name)
314 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315 stats_provider=FakeStatsProvider())
316 ss.setServiceParent(self.sparent)
319 def test_create(self):
320 self.create("test_create")
322 def test_declares_fixed_1528(self):
323 ss = self.create("test_declares_fixed_1528")
324 ver = ss.remote_get_version()
325 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326 self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
328 def test_declares_maximum_share_sizes(self):
329 ss = self.create("test_declares_maximum_share_sizes")
330 ver = ss.remote_get_version()
331 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
332 self.failUnlessIn('maximum-immutable-share-size', sv1)
333 self.failUnlessIn('maximum-mutable-share-size', sv1)
335 def allocate(self, ss, storage_index, sharenums, size, canary=None):
336 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
337 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
339 canary = FakeCanary()
340 return ss.remote_allocate_buckets(storage_index,
341 renew_secret, cancel_secret,
342 sharenums, size, canary)
344 def test_large_share(self):
345 syslow = platform.system().lower()
346 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
347 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
349 avail = fileutil.get_available_space('.', 512*2**20)
351 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
353 ss = self.create("test_large_share")
355 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
356 self.failUnlessEqual(already, set())
357 self.failUnlessEqual(set(writers.keys()), set([0]))
359 shnum, bucket = writers.items()[0]
360 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
361 bucket.remote_write(2**32, "ab")
362 bucket.remote_close()
364 readers = ss.remote_get_buckets("allocate")
365 reader = readers[shnum]
366 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
368 def test_dont_overfill_dirs(self):
370 This test asserts that if you add a second share whose storage index
371 share lots of leading bits with an extant share (but isn't the exact
372 same storage index), this won't add an entry to the share directory.
374 ss = self.create("test_dont_overfill_dirs")
375 already, writers = self.allocate(ss, "storageindex", [0], 10)
376 for i, wb in writers.items():
377 wb.remote_write(0, "%10d" % i)
379 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
381 children_of_storedir = set(os.listdir(storedir))
383 # Now store another one under another storageindex that has leading
384 # chars the same as the first storageindex.
385 already, writers = self.allocate(ss, "storageindey", [0], 10)
386 for i, wb in writers.items():
387 wb.remote_write(0, "%10d" % i)
389 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
391 new_children_of_storedir = set(os.listdir(storedir))
392 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
394 def test_remove_incoming(self):
395 ss = self.create("test_remove_incoming")
396 already, writers = self.allocate(ss, "vid", range(3), 10)
397 for i,wb in writers.items():
398 wb.remote_write(0, "%10d" % i)
400 incoming_share_dir = wb.incominghome
401 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
402 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
403 incoming_dir = os.path.dirname(incoming_prefix_dir)
404 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
405 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
406 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
408 def test_abort(self):
409 # remote_abort, when called on a writer, should make sure that
410 # the allocated size of the bucket is not counted by the storage
411 # server when accounting for space.
412 ss = self.create("test_abort")
413 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
414 self.failIfEqual(ss.allocated_size(), 0)
416 # Now abort the writers.
417 for writer in writers.itervalues():
418 writer.remote_abort()
419 self.failUnlessEqual(ss.allocated_size(), 0)
422 def test_allocate(self):
423 ss = self.create("test_allocate")
425 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
427 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
428 self.failUnlessEqual(already, set())
429 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
431 # while the buckets are open, they should not count as readable
432 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
435 for i,wb in writers.items():
436 wb.remote_write(0, "%25d" % i)
438 # aborting a bucket that was already closed is a no-op
441 # now they should be readable
442 b = ss.remote_get_buckets("allocate")
443 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
444 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
446 self.failUnlessIn("BucketReader", b_str)
447 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
449 # now if we ask about writing again, the server should offer those
450 # three buckets as already present. It should offer them even if we
451 # don't ask about those specific ones.
452 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
453 self.failUnlessEqual(already, set([0,1,2]))
454 self.failUnlessEqual(set(writers.keys()), set([3,4]))
456 # while those two buckets are open for writing, the server should
457 # refuse to offer them to uploaders
459 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
460 self.failUnlessEqual(already2, set([0,1,2]))
461 self.failUnlessEqual(set(writers2.keys()), set([5]))
463 # aborting the writes should remove the tempfiles
464 for i,wb in writers2.items():
466 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
467 self.failUnlessEqual(already2, set([0,1,2]))
468 self.failUnlessEqual(set(writers2.keys()), set([5]))
470 for i,wb in writers2.items():
472 for i,wb in writers.items():
475 def test_bad_container_version(self):
476 ss = self.create("test_bad_container_version")
477 a,w = self.allocate(ss, "si1", [0], 10)
478 w[0].remote_write(0, "\xff"*10)
481 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
484 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
487 ss.remote_get_buckets("allocate")
489 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
490 ss.remote_get_buckets, "si1")
491 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
493 def test_disconnect(self):
494 # simulate a disconnection
495 ss = self.create("test_disconnect")
496 canary = FakeCanary()
497 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
498 self.failUnlessEqual(already, set())
499 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
500 for (f,args,kwargs) in canary.disconnectors.values():
505 # that ought to delete the incoming shares
506 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
507 self.failUnlessEqual(already, set())
508 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
510 @mock.patch('allmydata.util.fileutil.get_disk_stats')
511 def test_reserved_space(self, mock_get_disk_stats):
513 mock_get_disk_stats.return_value = {
514 'free_for_nonroot': 15000,
515 'avail': max(15000 - reserved_space, 0),
518 ss = self.create("test_reserved_space", reserved_space=reserved_space)
519 # 15k available, 10k reserved, leaves 5k for shares
521 # a newly created and filled share incurs this much overhead, beyond
522 # the size we request.
524 LEASE_SIZE = 4+32+32+4
525 canary = FakeCanary(True)
526 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
527 self.failUnlessEqual(len(writers), 3)
528 # now the StorageServer should have 3000 bytes provisionally
529 # allocated, allowing only 2000 more to be claimed
530 self.failUnlessEqual(len(ss._active_writers), 3)
532 # allocating 1001-byte shares only leaves room for one
533 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
534 self.failUnlessEqual(len(writers2), 1)
535 self.failUnlessEqual(len(ss._active_writers), 4)
537 # we abandon the first set, so their provisional allocation should be
541 self.failUnlessEqual(len(ss._active_writers), 1)
542 # now we have a provisional allocation of 1001 bytes
544 # and we close the second set, so their provisional allocation should
545 # become real, long-term allocation, and grows to include the
547 for bw in writers2.values():
548 bw.remote_write(0, "a"*25)
553 self.failUnlessEqual(len(ss._active_writers), 0)
555 allocated = 1001 + OVERHEAD + LEASE_SIZE
557 # we have to manually increase available, since we're not doing real
559 mock_get_disk_stats.return_value = {
560 'free_for_nonroot': 15000 - allocated,
561 'avail': max(15000 - allocated - reserved_space, 0),
564 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
565 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
566 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
567 self.failUnlessEqual(len(writers3), 39)
568 self.failUnlessEqual(len(ss._active_writers), 39)
572 self.failUnlessEqual(len(ss._active_writers), 0)
573 ss.disownServiceParent()
577 basedir = self.workdir("test_seek_behavior")
578 fileutil.make_dirs(basedir)
579 filename = os.path.join(basedir, "testfile")
580 f = open(filename, "wb")
583 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
584 # files. mode="a" preserves previous contents but does not allow
585 # seeking-to-create-holes. mode="r+" allows both.
586 f = open(filename, "rb+")
590 filelen = os.stat(filename)[stat.ST_SIZE]
591 self.failUnlessEqual(filelen, 100+3)
592 f2 = open(filename, "rb")
593 self.failUnlessEqual(f2.read(5), "start")
596 def test_leases(self):
597 ss = self.create("test_leases")
598 canary = FakeCanary()
602 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
603 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
604 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
605 sharenums, size, canary)
606 self.failUnlessEqual(len(already), 0)
607 self.failUnlessEqual(len(writers), 5)
608 for wb in writers.values():
611 leases = list(ss.get_leases("si0"))
612 self.failUnlessEqual(len(leases), 1)
613 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
615 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
616 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
617 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
618 sharenums, size, canary)
619 for wb in writers.values():
622 # take out a second lease on si1
623 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
624 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
625 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
626 sharenums, size, canary)
627 self.failUnlessEqual(len(already), 5)
628 self.failUnlessEqual(len(writers), 0)
630 leases = list(ss.get_leases("si1"))
631 self.failUnlessEqual(len(leases), 2)
632 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
634 # and a third lease, using add-lease
635 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
636 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
637 ss.remote_add_lease("si1", rs2a, cs2a)
638 leases = list(ss.get_leases("si1"))
639 self.failUnlessEqual(len(leases), 3)
640 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
642 # add-lease on a missing storage index is silently ignored
643 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
645 # check that si0 is readable
646 readers = ss.remote_get_buckets("si0")
647 self.failUnlessEqual(len(readers), 5)
649 # renew the first lease. Only the proper renew_secret should work
650 ss.remote_renew_lease("si0", rs0)
651 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
652 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
654 # check that si0 is still readable
655 readers = ss.remote_get_buckets("si0")
656 self.failUnlessEqual(len(readers), 5)
658 # There is no such method as remote_cancel_lease for now -- see
660 self.failIf(hasattr(ss, 'remote_cancel_lease'), \
661 "ss should not have a 'remote_cancel_lease' method/attribute")
663 # test overlapping uploads
664 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
665 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
666 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
667 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
668 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
669 sharenums, size, canary)
670 self.failUnlessEqual(len(already), 0)
671 self.failUnlessEqual(len(writers), 5)
672 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
673 sharenums, size, canary)
674 self.failUnlessEqual(len(already2), 0)
675 self.failUnlessEqual(len(writers2), 0)
676 for wb in writers.values():
679 leases = list(ss.get_leases("si3"))
680 self.failUnlessEqual(len(leases), 1)
682 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
683 sharenums, size, canary)
684 self.failUnlessEqual(len(already3), 5)
685 self.failUnlessEqual(len(writers3), 0)
687 leases = list(ss.get_leases("si3"))
688 self.failUnlessEqual(len(leases), 2)
690 def test_readonly(self):
691 workdir = self.workdir("test_readonly")
692 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
693 ss.setServiceParent(self.sparent)
695 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
696 self.failUnlessEqual(already, set())
697 self.failUnlessEqual(writers, {})
699 stats = ss.get_stats()
700 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
701 if "storage_server.disk_avail" in stats:
702 # Some platforms may not have an API to get disk stats.
703 # But if there are stats, readonly_storage means disk_avail=0
704 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
706 def test_discard(self):
707 # discard is really only used for other tests, but we test it anyways
708 workdir = self.workdir("test_discard")
709 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
710 ss.setServiceParent(self.sparent)
712 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
713 self.failUnlessEqual(already, set())
714 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
715 for i,wb in writers.items():
716 wb.remote_write(0, "%25d" % i)
718 # since we discard the data, the shares should be present but sparse.
719 # Since we write with some seeks, the data we read back will be all
721 b = ss.remote_get_buckets("vid")
722 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
723 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
725 def test_advise_corruption(self):
726 workdir = self.workdir("test_advise_corruption")
727 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
728 ss.setServiceParent(self.sparent)
730 si0_s = base32.b2a("si0")
731 ss.remote_advise_corrupt_share("immutable", "si0", 0,
732 "This share smells funny.\n")
733 reportdir = os.path.join(workdir, "corruption-advisories")
734 reports = os.listdir(reportdir)
735 self.failUnlessEqual(len(reports), 1)
736 report_si0 = reports[0]
737 self.failUnlessIn(si0_s, report_si0)
738 f = open(os.path.join(reportdir, report_si0), "r")
741 self.failUnlessIn("type: immutable", report)
742 self.failUnlessIn("storage_index: %s" % si0_s, report)
743 self.failUnlessIn("share_number: 0", report)
744 self.failUnlessIn("This share smells funny.", report)
746 # test the RIBucketWriter version too
747 si1_s = base32.b2a("si1")
748 already,writers = self.allocate(ss, "si1", [1], 75)
749 self.failUnlessEqual(already, set())
750 self.failUnlessEqual(set(writers.keys()), set([1]))
751 writers[1].remote_write(0, "data")
752 writers[1].remote_close()
754 b = ss.remote_get_buckets("si1")
755 self.failUnlessEqual(set(b.keys()), set([1]))
756 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
758 reports = os.listdir(reportdir)
759 self.failUnlessEqual(len(reports), 2)
760 report_si1 = [r for r in reports if si1_s in r][0]
761 f = open(os.path.join(reportdir, report_si1), "r")
764 self.failUnlessIn("type: immutable", report)
765 self.failUnlessIn("storage_index: %s" % si1_s, report)
766 self.failUnlessIn("share_number: 1", report)
767 self.failUnlessIn("This share tastes like dust.", report)
771 class MutableServer(unittest.TestCase):
774 self.sparent = LoggingServiceParent()
775 self._lease_secret = itertools.count()
777 return self.sparent.stopService()
779 def workdir(self, name):
780 basedir = os.path.join("storage", "MutableServer", name)
783 def create(self, name):
784 workdir = self.workdir(name)
785 ss = StorageServer(workdir, "\x00" * 20)
786 ss.setServiceParent(self.sparent)
789 def test_create(self):
790 self.create("test_create")
792 def write_enabler(self, we_tag):
793 return hashutil.tagged_hash("we_blah", we_tag)
795 def renew_secret(self, tag):
796 return hashutil.tagged_hash("renew_blah", str(tag))
798 def cancel_secret(self, tag):
799 return hashutil.tagged_hash("cancel_blah", str(tag))
801 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
802 write_enabler = self.write_enabler(we_tag)
803 renew_secret = self.renew_secret(lease_tag)
804 cancel_secret = self.cancel_secret(lease_tag)
805 rstaraw = ss.remote_slot_testv_and_readv_and_writev
806 testandwritev = dict( [ (shnum, ([], [], None) )
807 for shnum in sharenums ] )
809 rc = rstaraw(storage_index,
810 (write_enabler, renew_secret, cancel_secret),
813 (did_write, readv_data) = rc
814 self.failUnless(did_write)
815 self.failUnless(isinstance(readv_data, dict))
816 self.failUnlessEqual(len(readv_data), 0)
818 def test_bad_magic(self):
819 ss = self.create("test_bad_magic")
820 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
821 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
826 read = ss.remote_slot_readv
827 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
828 read, "si1", [0], [(0,10)])
829 self.failUnlessIn(" had magic ", str(e))
830 self.failUnlessIn(" but we wanted ", str(e))
832 def test_container_size(self):
833 ss = self.create("test_container_size")
834 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
836 read = ss.remote_slot_readv
837 rstaraw = ss.remote_slot_testv_and_readv_and_writev
838 secrets = ( self.write_enabler("we1"),
839 self.renew_secret("we1"),
840 self.cancel_secret("we1") )
841 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
842 answer = rstaraw("si1", secrets,
843 {0: ([], [(0,data)], len(data)+12)},
845 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
847 # Trying to make the container too large (by sending a write vector
848 # whose offset is too high) will raise an exception.
849 TOOBIG = MutableShareFile.MAX_SIZE + 10
850 self.failUnlessRaises(DataTooLargeError,
851 rstaraw, "si1", secrets,
852 {0: ([], [(TOOBIG,data)], None)},
855 answer = rstaraw("si1", secrets,
856 {0: ([], [(0,data)], None)},
858 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
860 read_answer = read("si1", [0], [(0,10)])
861 self.failUnlessEqual(read_answer, {0: [data[:10]]})
863 # Sending a new_length shorter than the current length truncates the
865 answer = rstaraw("si1", secrets,
868 read_answer = read("si1", [0], [(0,10)])
869 self.failUnlessEqual(read_answer, {0: [data[:9]]})
871 # Sending a new_length longer than the current length doesn't change
873 answer = rstaraw("si1", secrets,
876 assert answer == (True, {0:[],1:[],2:[]})
877 read_answer = read("si1", [0], [(0, 20)])
878 self.failUnlessEqual(read_answer, {0: [data[:9]]})
880 # Sending a write vector whose start is after the end of the current
881 # data doesn't reveal "whatever was there last time" (palimpsest),
882 # but instead fills with zeroes.
884 # To test this, we fill the data area with a recognizable pattern.
885 pattern = ''.join([chr(i) for i in range(100)])
886 answer = rstaraw("si1", secrets,
887 {0: ([], [(0, pattern)], None)},
889 assert answer == (True, {0:[],1:[],2:[]})
890 # Then truncate the data...
891 answer = rstaraw("si1", secrets,
894 assert answer == (True, {0:[],1:[],2:[]})
895 # Just confirm that you get an empty string if you try to read from
896 # past the (new) endpoint now.
897 answer = rstaraw("si1", secrets,
900 self.failUnlessEqual(answer, (True, {0:[''],1:[''],2:['']}))
902 # Then the extend the file by writing a vector which starts out past
904 answer = rstaraw("si1", secrets,
905 {0: ([], [(50, 'hellothere')], None)},
907 assert answer == (True, {0:[],1:[],2:[]})
908 # Now if you read the stuff between 20 (where we earlier truncated)
909 # and 50, it had better be all zeroes.
910 answer = rstaraw("si1", secrets,
913 self.failUnlessEqual(answer, (True, {0:['\x00'*30],1:[''],2:['']}))
915 # Also see if the server explicitly declares that it supports this
917 ver = ss.remote_get_version()
918 storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
919 self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
921 # If the size is dropped to zero the share is deleted.
922 answer = rstaraw("si1", secrets,
923 {0: ([], [(0,data)], 0)},
925 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
927 read_answer = read("si1", [0], [(0,10)])
928 self.failUnlessEqual(read_answer, {})
930 def test_allocate(self):
931 ss = self.create("test_allocate")
932 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
935 read = ss.remote_slot_readv
936 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
938 self.failUnlessEqual(read("si1", [], [(0, 10)]),
939 {0: [""], 1: [""], 2: [""]})
940 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
944 secrets = ( self.write_enabler("we1"),
945 self.renew_secret("we1"),
946 self.cancel_secret("we1") )
947 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
948 write = ss.remote_slot_testv_and_readv_and_writev
949 answer = write("si1", secrets,
950 {0: ([], [(0,data)], None)},
952 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
954 self.failUnlessEqual(read("si1", [0], [(0,20)]),
955 {0: ["00000000001111111111"]})
956 self.failUnlessEqual(read("si1", [0], [(95,10)]),
958 #self.failUnlessEqual(s0.remote_get_length(), 100)
960 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
961 f = self.failUnlessRaises(BadWriteEnablerError,
962 write, "si1", bad_secrets,
964 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
966 # this testv should fail
967 answer = write("si1", secrets,
968 {0: ([(0, 12, "eq", "444444444444"),
969 (20, 5, "eq", "22222"),
976 self.failUnlessEqual(answer, (False,
977 {0: ["000000000011", "22222"],
981 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
984 answer = write("si1", secrets,
985 {0: ([(10, 5, "lt", "11111"),
992 self.failUnlessEqual(answer, (False,
997 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1000 def test_operators(self):
1001 # test operators, the data we're comparing is '11111' in all cases.
1002 # test both fail+pass, reset data after each one.
1003 ss = self.create("test_operators")
1005 secrets = ( self.write_enabler("we1"),
1006 self.renew_secret("we1"),
1007 self.cancel_secret("we1") )
1008 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1009 write = ss.remote_slot_testv_and_readv_and_writev
1010 read = ss.remote_slot_readv
1013 write("si1", secrets,
1014 {0: ([], [(0,data)], None)},
1020 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
1025 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1026 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1027 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
1030 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
1035 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1036 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1039 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1044 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1045 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1049 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1054 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1055 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1058 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1063 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1064 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1067 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1072 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1073 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1077 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1082 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1083 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1086 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1091 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1092 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1096 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1101 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1102 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1105 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1110 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1111 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1115 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1120 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1121 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1124 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1129 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1130 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1133 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1138 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1139 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1143 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1148 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1149 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1152 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1157 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1158 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1161 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1166 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1167 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1170 # finally, test some operators against empty shares
1171 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1176 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1177 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1180 def test_readv(self):
1181 ss = self.create("test_readv")
1182 secrets = ( self.write_enabler("we1"),
1183 self.renew_secret("we1"),
1184 self.cancel_secret("we1") )
1185 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1186 write = ss.remote_slot_testv_and_readv_and_writev
1187 read = ss.remote_slot_readv
1188 data = [("%d" % i) * 100 for i in range(3)]
1189 rc = write("si1", secrets,
1190 {0: ([], [(0,data[0])], None),
1191 1: ([], [(0,data[1])], None),
1192 2: ([], [(0,data[2])], None),
1194 self.failUnlessEqual(rc, (True, {}))
1196 answer = read("si1", [], [(0, 10)])
1197 self.failUnlessEqual(answer, {0: ["0"*10],
1201 def compare_leases_without_timestamps(self, leases_a, leases_b):
1202 self.failUnlessEqual(len(leases_a), len(leases_b))
1203 for i in range(len(leases_a)):
1206 self.failUnlessEqual(a.owner_num, b.owner_num)
1207 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1208 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1209 self.failUnlessEqual(a.nodeid, b.nodeid)
1211 def compare_leases(self, leases_a, leases_b):
1212 self.failUnlessEqual(len(leases_a), len(leases_b))
1213 for i in range(len(leases_a)):
1216 self.failUnlessEqual(a.owner_num, b.owner_num)
1217 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1218 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1219 self.failUnlessEqual(a.nodeid, b.nodeid)
1220 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1222 def test_leases(self):
1223 ss = self.create("test_leases")
1225 return ( self.write_enabler("we1"),
1226 self.renew_secret("we1-%d" % n),
1227 self.cancel_secret("we1-%d" % n) )
1228 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1229 write = ss.remote_slot_testv_and_readv_and_writev
1230 read = ss.remote_slot_readv
1231 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1232 self.failUnlessEqual(rc, (True, {}))
1234 # create a random non-numeric file in the bucket directory, to
1235 # exercise the code that's supposed to ignore those.
1236 bucket_dir = os.path.join(self.workdir("test_leases"),
1237 "shares", storage_index_to_dir("si1"))
1238 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1239 f.write("you ought to be ignoring me\n")
1242 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1243 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1245 # add-lease on a missing storage index is silently ignored
1246 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1248 # re-allocate the slots and use the same secrets, that should update
1250 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1251 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1254 ss.remote_renew_lease("si1", secrets(0)[1])
1255 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1257 # now allocate them with a bunch of different secrets, to trigger the
1258 # extended lease code. Use add_lease for one of them.
1259 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1260 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1261 secrets2 = secrets(2)
1262 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1263 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1264 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1265 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1266 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1268 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1270 all_leases = list(s0.get_leases())
1271 # and write enough data to expand the container, forcing the server
1272 # to move the leases
1273 write("si1", secrets(0),
1274 {0: ([], [(0,data)], 200), },
1277 # read back the leases, make sure they're still intact.
1278 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1280 ss.remote_renew_lease("si1", secrets(0)[1])
1281 ss.remote_renew_lease("si1", secrets(1)[1])
1282 ss.remote_renew_lease("si1", secrets(2)[1])
1283 ss.remote_renew_lease("si1", secrets(3)[1])
1284 ss.remote_renew_lease("si1", secrets(4)[1])
1285 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1286 # get a new copy of the leases, with the current timestamps. Reading
1287 # data and failing to renew/cancel leases should leave the timestamps
1289 all_leases = list(s0.get_leases())
1290 # renewing with a bogus token should prompt an error message
1292 # examine the exception thus raised, make sure the old nodeid is
1293 # present, to provide for share migration
1294 e = self.failUnlessRaises(IndexError,
1295 ss.remote_renew_lease, "si1",
1298 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1299 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1300 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1302 self.compare_leases(all_leases, list(s0.get_leases()))
1304 # reading shares should not modify the timestamp
1305 read("si1", [], [(0,200)])
1306 self.compare_leases(all_leases, list(s0.get_leases()))
1308 write("si1", secrets(0),
1309 {0: ([], [(200, "make me bigger")], None)}, [])
1310 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1312 write("si1", secrets(0),
1313 {0: ([], [(500, "make me really bigger")], None)}, [])
1314 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1316 def test_remove(self):
1317 ss = self.create("test_remove")
1318 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1320 readv = ss.remote_slot_readv
1321 writev = ss.remote_slot_testv_and_readv_and_writev
1322 secrets = ( self.write_enabler("we1"),
1323 self.renew_secret("we1"),
1324 self.cancel_secret("we1") )
1325 # delete sh0 by setting its size to zero
1326 answer = writev("si1", secrets,
1329 # the answer should mention all the shares that existed before the
1331 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1332 # but a new read should show only sh1 and sh2
1333 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1336 # delete sh1 by setting its size to zero
1337 answer = writev("si1", secrets,
1340 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1341 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1344 # delete sh2 by setting its size to zero
1345 answer = writev("si1", secrets,
1348 self.failUnlessEqual(answer, (True, {2:[]}) )
1349 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1351 # and the bucket directory should now be gone
1352 si = base32.b2a("si1")
1353 # note: this is a detail of the storage server implementation, and
1354 # may change in the future
1356 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1357 bucketdir = os.path.join(prefixdir, si)
1358 self.failUnless(os.path.exists(prefixdir), prefixdir)
1359 self.failIf(os.path.exists(bucketdir), bucketdir)
1362 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1364 self.sparent = LoggingServiceParent()
1365 self._lease_secret = itertools.count()
1366 self.ss = self.create("MDMFProxies storage test server")
1367 self.rref = RemoteBucket()
1368 self.rref.target = self.ss
1369 self.secrets = (self.write_enabler("we_secret"),
1370 self.renew_secret("renew_secret"),
1371 self.cancel_secret("cancel_secret"))
1372 self.segment = "aaaaaa"
1374 self.salt = "a" * 16
1375 self.block_hash = "a" * 32
1376 self.block_hash_tree = [self.block_hash for i in xrange(6)]
1377 self.share_hash = self.block_hash
1378 self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1379 self.signature = "foobarbaz"
1380 self.verification_key = "vvvvvv"
1381 self.encprivkey = "private"
1382 self.root_hash = self.block_hash
1383 self.salt_hash = self.root_hash
1384 self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1385 self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1386 self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1387 # blockhashes and salt hashes are serialized in the same way,
1388 # only we lop off the first element and store that in the
1390 self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1394 self.sparent.stopService()
1395 shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1398 def write_enabler(self, we_tag):
1399 return hashutil.tagged_hash("we_blah", we_tag)
1402 def renew_secret(self, tag):
1403 return hashutil.tagged_hash("renew_blah", str(tag))
1406 def cancel_secret(self, tag):
1407 return hashutil.tagged_hash("cancel_blah", str(tag))
1410 def workdir(self, name):
1411 basedir = os.path.join("storage", "MutableServer", name)
1415 def create(self, name):
1416 workdir = self.workdir(name)
1417 ss = StorageServer(workdir, "\x00" * 20)
1418 ss.setServiceParent(self.sparent)
1422 def build_test_mdmf_share(self, tail_segment=False, empty=False):
1423 # Start with the checkstring
1424 data = struct.pack(">BQ32s",
1428 self.checkstring = data
1429 # Next, the encoding parameters
1431 data += struct.pack(">BBQQ",
1437 data += struct.pack(">BBQQ",
1443 data += struct.pack(">BBQQ",
1448 # Now we'll build the offsets.
1450 if not tail_segment and not empty:
1452 sharedata += self.salt + self.block
1455 sharedata += self.salt + self.block
1456 sharedata += self.salt + "a"
1458 # The encrypted private key comes after the shares + salts
1459 offset_size = struct.calcsize(MDMFOFFSETS)
1460 encrypted_private_key_offset = len(data) + offset_size
1461 # The share has chain comes after the private key
1462 sharehashes_offset = encrypted_private_key_offset + \
1463 len(self.encprivkey)
1465 # The signature comes after the share hash chain.
1466 signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1468 verification_key_offset = signature_offset + len(self.signature)
1469 verification_key_end = verification_key_offset + \
1470 len(self.verification_key)
1472 share_data_offset = offset_size
1473 share_data_offset += PRIVATE_KEY_SIZE
1474 share_data_offset += SIGNATURE_SIZE
1475 share_data_offset += VERIFICATION_KEY_SIZE
1476 share_data_offset += SHARE_HASH_CHAIN_SIZE
1478 blockhashes_offset = share_data_offset + len(sharedata)
1479 eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1481 data += struct.pack(MDMFOFFSETS,
1482 encrypted_private_key_offset,
1485 verification_key_offset,
1486 verification_key_end,
1492 self.offsets['enc_privkey'] = encrypted_private_key_offset
1493 self.offsets['block_hash_tree'] = blockhashes_offset
1494 self.offsets['share_hash_chain'] = sharehashes_offset
1495 self.offsets['signature'] = signature_offset
1496 self.offsets['verification_key'] = verification_key_offset
1497 self.offsets['share_data'] = share_data_offset
1498 self.offsets['verification_key_end'] = verification_key_end
1499 self.offsets['EOF'] = eof_offset
1502 data += self.encprivkey
1504 data += self.share_hash_chain_s
1506 data += self.signature
1507 # and the verification key
1508 data += self.verification_key
1509 # Then we'll add in gibberish until we get to the right point.
1510 nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1513 # Then the share data
1516 data += self.block_hash_tree_s
1520 def write_test_share_to_server(self,
1525 I write some data for the read tests to read to self.ss
1527 If tail_segment=True, then I will write a share that has a
1528 smaller tail segment than other segments.
1530 write = self.ss.remote_slot_testv_and_readv_and_writev
1531 data = self.build_test_mdmf_share(tail_segment, empty)
1532 # Finally, we write the whole thing to the storage server in one
1534 testvs = [(0, 1, "eq", "")]
1536 tws[0] = (testvs, [(0, data)], None)
1538 results = write(storage_index, self.secrets, tws, readv)
1539 self.failUnless(results[0])
1542 def build_test_sdmf_share(self, empty=False):
1546 sharedata = self.segment * 6
1547 self.sharedata = sharedata
1548 blocksize = len(sharedata) / 3
1549 block = sharedata[:blocksize]
1550 self.blockdata = block
1551 prefix = struct.pack(">BQ32s16s BBQQ",
1561 post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1562 signature_offset = post_offset + len(self.verification_key)
1563 sharehashes_offset = signature_offset + len(self.signature)
1564 blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1565 sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1566 encprivkey_offset = sharedata_offset + len(block)
1567 eof_offset = encprivkey_offset + len(self.encprivkey)
1568 offsets = struct.pack(">LLLLQQ",
1575 final_share = "".join([prefix,
1577 self.verification_key,
1579 self.share_hash_chain_s,
1580 self.block_hash_tree_s,
1584 self.offsets['signature'] = signature_offset
1585 self.offsets['share_hash_chain'] = sharehashes_offset
1586 self.offsets['block_hash_tree'] = blockhashes_offset
1587 self.offsets['share_data'] = sharedata_offset
1588 self.offsets['enc_privkey'] = encprivkey_offset
1589 self.offsets['EOF'] = eof_offset
1593 def write_sdmf_share_to_server(self,
1596 # Some tests need SDMF shares to verify that we can still
1597 # read them. This method writes one, which resembles but is not
1599 write = self.ss.remote_slot_testv_and_readv_and_writev
1600 share = self.build_test_sdmf_share(empty)
1601 testvs = [(0, 1, "eq", "")]
1603 tws[0] = (testvs, [(0, share)], None)
1605 results = write(storage_index, self.secrets, tws, readv)
1606 self.failUnless(results[0])
1609 def test_read(self):
1610 self.write_test_share_to_server("si1")
1611 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1612 # Check that every method equals what we expect it to.
1613 d = defer.succeed(None)
1614 def _check_block_and_salt((block, salt)):
1615 self.failUnlessEqual(block, self.block)
1616 self.failUnlessEqual(salt, self.salt)
1619 d.addCallback(lambda ignored, i=i:
1620 mr.get_block_and_salt(i))
1621 d.addCallback(_check_block_and_salt)
1623 d.addCallback(lambda ignored:
1624 mr.get_encprivkey())
1625 d.addCallback(lambda encprivkey:
1626 self.failUnlessEqual(self.encprivkey, encprivkey))
1628 d.addCallback(lambda ignored:
1629 mr.get_blockhashes())
1630 d.addCallback(lambda blockhashes:
1631 self.failUnlessEqual(self.block_hash_tree, blockhashes))
1633 d.addCallback(lambda ignored:
1634 mr.get_sharehashes())
1635 d.addCallback(lambda sharehashes:
1636 self.failUnlessEqual(self.share_hash_chain, sharehashes))
1638 d.addCallback(lambda ignored:
1640 d.addCallback(lambda signature:
1641 self.failUnlessEqual(signature, self.signature))
1643 d.addCallback(lambda ignored:
1644 mr.get_verification_key())
1645 d.addCallback(lambda verification_key:
1646 self.failUnlessEqual(verification_key, self.verification_key))
1648 d.addCallback(lambda ignored:
1650 d.addCallback(lambda seqnum:
1651 self.failUnlessEqual(seqnum, 0))
1653 d.addCallback(lambda ignored:
1655 d.addCallback(lambda root_hash:
1656 self.failUnlessEqual(self.root_hash, root_hash))
1658 d.addCallback(lambda ignored:
1660 d.addCallback(lambda seqnum:
1661 self.failUnlessEqual(0, seqnum))
1663 d.addCallback(lambda ignored:
1664 mr.get_encoding_parameters())
1665 def _check_encoding_parameters((k, n, segsize, datalen)):
1666 self.failUnlessEqual(k, 3)
1667 self.failUnlessEqual(n, 10)
1668 self.failUnlessEqual(segsize, 6)
1669 self.failUnlessEqual(datalen, 36)
1670 d.addCallback(_check_encoding_parameters)
1672 d.addCallback(lambda ignored:
1673 mr.get_checkstring())
1674 d.addCallback(lambda checkstring:
1675 self.failUnlessEqual(checkstring, checkstring))
1679 def test_read_with_different_tail_segment_size(self):
1680 self.write_test_share_to_server("si1", tail_segment=True)
1681 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1682 d = mr.get_block_and_salt(5)
1683 def _check_tail_segment(results):
1684 block, salt = results
1685 self.failUnlessEqual(len(block), 1)
1686 self.failUnlessEqual(block, "a")
1687 d.addCallback(_check_tail_segment)
1691 def test_get_block_with_invalid_segnum(self):
1692 self.write_test_share_to_server("si1")
1693 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1694 d = defer.succeed(None)
1695 d.addCallback(lambda ignored:
1696 self.shouldFail(LayoutInvalid, "test invalid segnum",
1698 mr.get_block_and_salt, 7))
1702 def test_get_encoding_parameters_first(self):
1703 self.write_test_share_to_server("si1")
1704 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1705 d = mr.get_encoding_parameters()
1706 def _check_encoding_parameters((k, n, segment_size, datalen)):
1707 self.failUnlessEqual(k, 3)
1708 self.failUnlessEqual(n, 10)
1709 self.failUnlessEqual(segment_size, 6)
1710 self.failUnlessEqual(datalen, 36)
1711 d.addCallback(_check_encoding_parameters)
1715 def test_get_seqnum_first(self):
1716 self.write_test_share_to_server("si1")
1717 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1719 d.addCallback(lambda seqnum:
1720 self.failUnlessEqual(seqnum, 0))
1724 def test_get_root_hash_first(self):
1725 self.write_test_share_to_server("si1")
1726 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1727 d = mr.get_root_hash()
1728 d.addCallback(lambda root_hash:
1729 self.failUnlessEqual(root_hash, self.root_hash))
1733 def test_get_checkstring_first(self):
1734 self.write_test_share_to_server("si1")
1735 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1736 d = mr.get_checkstring()
1737 d.addCallback(lambda checkstring:
1738 self.failUnlessEqual(checkstring, self.checkstring))
1742 def test_write_read_vectors(self):
1743 # When writing for us, the storage server will return to us a
1744 # read vector, along with its result. If a write fails because
1745 # the test vectors failed, this read vector can help us to
1746 # diagnose the problem. This test ensures that the read vector
1747 # is working appropriately.
1748 mw = self._make_new_mw("si1", 0)
1751 mw.put_block(self.block, i, self.salt)
1752 mw.put_encprivkey(self.encprivkey)
1753 mw.put_blockhashes(self.block_hash_tree)
1754 mw.put_sharehashes(self.share_hash_chain)
1755 mw.put_root_hash(self.root_hash)
1756 mw.put_signature(self.signature)
1757 mw.put_verification_key(self.verification_key)
1758 d = mw.finish_publishing()
1760 self.failUnless(len(results), 2)
1761 result, readv = results
1762 self.failUnless(result)
1764 self.old_checkstring = mw.get_checkstring()
1765 mw.set_checkstring("")
1766 d.addCallback(_then)
1767 d.addCallback(lambda ignored:
1768 mw.finish_publishing())
1769 def _then_again(results):
1770 self.failUnlessEqual(len(results), 2)
1771 result, readvs = results
1773 self.failUnlessIn(0, readvs)
1774 readv = readvs[0][0]
1775 self.failUnlessEqual(readv, self.old_checkstring)
1776 d.addCallback(_then_again)
1777 # The checkstring remains the same for the rest of the process.
1781 def test_private_key_after_share_hash_chain(self):
1782 mw = self._make_new_mw("si1", 0)
1783 d = defer.succeed(None)
1785 d.addCallback(lambda ignored, i=i:
1786 mw.put_block(self.block, i, self.salt))
1787 d.addCallback(lambda ignored:
1788 mw.put_encprivkey(self.encprivkey))
1789 d.addCallback(lambda ignored:
1790 mw.put_sharehashes(self.share_hash_chain))
1792 # Now try to put the private key again.
1793 d.addCallback(lambda ignored:
1794 self.shouldFail(LayoutInvalid, "test repeat private key",
1796 mw.put_encprivkey, self.encprivkey))
1800 def test_signature_after_verification_key(self):
1801 mw = self._make_new_mw("si1", 0)
1802 d = defer.succeed(None)
1803 # Put everything up to and including the verification key.
1805 d.addCallback(lambda ignored, i=i:
1806 mw.put_block(self.block, i, self.salt))
1807 d.addCallback(lambda ignored:
1808 mw.put_encprivkey(self.encprivkey))
1809 d.addCallback(lambda ignored:
1810 mw.put_blockhashes(self.block_hash_tree))
1811 d.addCallback(lambda ignored:
1812 mw.put_sharehashes(self.share_hash_chain))
1813 d.addCallback(lambda ignored:
1814 mw.put_root_hash(self.root_hash))
1815 d.addCallback(lambda ignored:
1816 mw.put_signature(self.signature))
1817 d.addCallback(lambda ignored:
1818 mw.put_verification_key(self.verification_key))
1819 # Now try to put the signature again. This should fail
1820 d.addCallback(lambda ignored:
1821 self.shouldFail(LayoutInvalid, "signature after verification",
1823 mw.put_signature, self.signature))
1827 def test_uncoordinated_write(self):
1828 # Make two mutable writers, both pointing to the same storage
1829 # server, both at the same storage index, and try writing to the
1831 mw1 = self._make_new_mw("si1", 0)
1832 mw2 = self._make_new_mw("si1", 0)
1834 def _check_success(results):
1835 result, readvs = results
1836 self.failUnless(result)
1838 def _check_failure(results):
1839 result, readvs = results
1842 def _write_share(mw):
1844 mw.put_block(self.block, i, self.salt)
1845 mw.put_encprivkey(self.encprivkey)
1846 mw.put_blockhashes(self.block_hash_tree)
1847 mw.put_sharehashes(self.share_hash_chain)
1848 mw.put_root_hash(self.root_hash)
1849 mw.put_signature(self.signature)
1850 mw.put_verification_key(self.verification_key)
1851 return mw.finish_publishing()
1852 d = _write_share(mw1)
1853 d.addCallback(_check_success)
1854 d.addCallback(lambda ignored:
1856 d.addCallback(_check_failure)
1860 def test_invalid_salt_size(self):
1861 # Salts need to be 16 bytes in size. Writes that attempt to
1862 # write more or less than this should be rejected.
1863 mw = self._make_new_mw("si1", 0)
1864 invalid_salt = "a" * 17 # 17 bytes
1865 another_invalid_salt = "b" * 15 # 15 bytes
1866 d = defer.succeed(None)
1867 d.addCallback(lambda ignored:
1868 self.shouldFail(LayoutInvalid, "salt too big",
1870 mw.put_block, self.block, 0, invalid_salt))
1871 d.addCallback(lambda ignored:
1872 self.shouldFail(LayoutInvalid, "salt too small",
1874 mw.put_block, self.block, 0,
1875 another_invalid_salt))
1879 def test_write_test_vectors(self):
1880 # If we give the write proxy a bogus test vector at
1881 # any point during the process, it should fail to write when we
1883 def _check_failure(results):
1884 self.failUnlessEqual(len(results), 2)
1888 def _check_success(results):
1889 self.failUnlessEqual(len(results), 2)
1891 self.failUnless(results)
1893 mw = self._make_new_mw("si1", 0)
1894 mw.set_checkstring("this is a lie")
1896 mw.put_block(self.block, i, self.salt)
1897 mw.put_encprivkey(self.encprivkey)
1898 mw.put_blockhashes(self.block_hash_tree)
1899 mw.put_sharehashes(self.share_hash_chain)
1900 mw.put_root_hash(self.root_hash)
1901 mw.put_signature(self.signature)
1902 mw.put_verification_key(self.verification_key)
1903 d = mw.finish_publishing()
1904 d.addCallback(_check_failure)
1905 d.addCallback(lambda ignored:
1906 mw.set_checkstring(""))
1907 d.addCallback(lambda ignored:
1908 mw.finish_publishing())
1909 d.addCallback(_check_success)
1913 def serialize_blockhashes(self, blockhashes):
1914 return "".join(blockhashes)
1917 def serialize_sharehashes(self, sharehashes):
1918 ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1919 for i in sorted(sharehashes.keys())])
1923 def test_write(self):
1924 # This translates to a file with 6 6-byte segments, and with 2-byte
1926 mw = self._make_new_mw("si1", 0)
1927 # Test writing some blocks.
1928 read = self.ss.remote_slot_readv
1929 expected_private_key_offset = struct.calcsize(MDMFHEADER)
1930 expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1931 PRIVATE_KEY_SIZE + \
1933 VERIFICATION_KEY_SIZE + \
1934 SHARE_HASH_CHAIN_SIZE
1935 written_block_size = 2 + len(self.salt)
1936 written_block = self.block + self.salt
1938 mw.put_block(self.block, i, self.salt)
1940 mw.put_encprivkey(self.encprivkey)
1941 mw.put_blockhashes(self.block_hash_tree)
1942 mw.put_sharehashes(self.share_hash_chain)
1943 mw.put_root_hash(self.root_hash)
1944 mw.put_signature(self.signature)
1945 mw.put_verification_key(self.verification_key)
1946 d = mw.finish_publishing()
1947 def _check_publish(results):
1948 self.failUnlessEqual(len(results), 2)
1949 result, ign = results
1950 self.failUnless(result, "publish failed")
1952 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1953 {0: [written_block]})
1955 self.failUnlessEqual(len(self.encprivkey), 7)
1956 self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1957 {0: [self.encprivkey]})
1959 expected_block_hash_offset = expected_sharedata_offset + \
1960 (6 * written_block_size)
1961 self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1962 self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1963 {0: [self.block_hash_tree_s]})
1965 expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1966 self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1967 {0: [self.share_hash_chain_s]})
1969 self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1970 {0: [self.root_hash]})
1971 expected_signature_offset = expected_share_hash_offset + \
1972 len(self.share_hash_chain_s)
1973 self.failUnlessEqual(len(self.signature), 9)
1974 self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1975 {0: [self.signature]})
1977 expected_verification_key_offset = expected_signature_offset + len(self.signature)
1978 self.failUnlessEqual(len(self.verification_key), 6)
1979 self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1980 {0: [self.verification_key]})
1982 signable = mw.get_signable()
1983 verno, seq, roothash, k, n, segsize, datalen = \
1984 struct.unpack(">BQ32sBBQQ",
1986 self.failUnlessEqual(verno, 1)
1987 self.failUnlessEqual(seq, 0)
1988 self.failUnlessEqual(roothash, self.root_hash)
1989 self.failUnlessEqual(k, 3)
1990 self.failUnlessEqual(n, 10)
1991 self.failUnlessEqual(segsize, 6)
1992 self.failUnlessEqual(datalen, 36)
1993 expected_eof_offset = expected_block_hash_offset + \
1994 len(self.block_hash_tree_s)
1996 # Check the version number to make sure that it is correct.
1997 expected_version_number = struct.pack(">B", 1)
1998 self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1999 {0: [expected_version_number]})
2000 # Check the sequence number to make sure that it is correct
2001 expected_sequence_number = struct.pack(">Q", 0)
2002 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2003 {0: [expected_sequence_number]})
2004 # Check that the encoding parameters (k, N, segement size, data
2005 # length) are what they should be. These are 3, 10, 6, 36
2006 expected_k = struct.pack(">B", 3)
2007 self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2009 expected_n = struct.pack(">B", 10)
2010 self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2012 expected_segment_size = struct.pack(">Q", 6)
2013 self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2014 {0: [expected_segment_size]})
2015 expected_data_length = struct.pack(">Q", 36)
2016 self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2017 {0: [expected_data_length]})
2018 expected_offset = struct.pack(">Q", expected_private_key_offset)
2019 self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2020 {0: [expected_offset]})
2021 expected_offset = struct.pack(">Q", expected_share_hash_offset)
2022 self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2023 {0: [expected_offset]})
2024 expected_offset = struct.pack(">Q", expected_signature_offset)
2025 self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2026 {0: [expected_offset]})
2027 expected_offset = struct.pack(">Q", expected_verification_key_offset)
2028 self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2029 {0: [expected_offset]})
2030 expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2031 self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2032 {0: [expected_offset]})
2033 expected_offset = struct.pack(">Q", expected_sharedata_offset)
2034 self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2035 {0: [expected_offset]})
2036 expected_offset = struct.pack(">Q", expected_block_hash_offset)
2037 self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2038 {0: [expected_offset]})
2039 expected_offset = struct.pack(">Q", expected_eof_offset)
2040 self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2041 {0: [expected_offset]})
2042 d.addCallback(_check_publish)
2045 def _make_new_mw(self, si, share, datalength=36):
2046 # This is a file of size 36 bytes. Since it has a segment
2047 # size of 6, we know that it has 6 byte segments, which will
2048 # be split into blocks of 2 bytes because our FEC k
2050 mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2055 def test_write_rejected_with_too_many_blocks(self):
2056 mw = self._make_new_mw("si0", 0)
2058 # Try writing too many blocks. We should not be able to write
2060 # blocks into each share.
2061 d = defer.succeed(None)
2063 d.addCallback(lambda ignored, i=i:
2064 mw.put_block(self.block, i, self.salt))
2065 d.addCallback(lambda ignored:
2066 self.shouldFail(LayoutInvalid, "too many blocks",
2068 mw.put_block, self.block, 7, self.salt))
2072 def test_write_rejected_with_invalid_salt(self):
2073 # Try writing an invalid salt. Salts are 16 bytes -- any more or
2074 # less should cause an error.
2075 mw = self._make_new_mw("si1", 0)
2076 bad_salt = "a" * 17 # 17 bytes
2077 d = defer.succeed(None)
2078 d.addCallback(lambda ignored:
2079 self.shouldFail(LayoutInvalid, "test_invalid_salt",
2080 None, mw.put_block, self.block, 7, bad_salt))
2084 def test_write_rejected_with_invalid_root_hash(self):
2085 # Try writing an invalid root hash. This should be SHA256d, and
2086 # 32 bytes long as a result.
2087 mw = self._make_new_mw("si2", 0)
2088 # 17 bytes != 32 bytes
2089 invalid_root_hash = "a" * 17
2090 d = defer.succeed(None)
2091 # Before this test can work, we need to put some blocks + salts,
2092 # a block hash tree, and a share hash tree. Otherwise, we'll see
2093 # failures that match what we are looking for, but are caused by
2094 # the constraints imposed on operation ordering.
2096 d.addCallback(lambda ignored, i=i:
2097 mw.put_block(self.block, i, self.salt))
2098 d.addCallback(lambda ignored:
2099 mw.put_encprivkey(self.encprivkey))
2100 d.addCallback(lambda ignored:
2101 mw.put_blockhashes(self.block_hash_tree))
2102 d.addCallback(lambda ignored:
2103 mw.put_sharehashes(self.share_hash_chain))
2104 d.addCallback(lambda ignored:
2105 self.shouldFail(LayoutInvalid, "invalid root hash",
2106 None, mw.put_root_hash, invalid_root_hash))
2110 def test_write_rejected_with_invalid_blocksize(self):
2111 # The blocksize implied by the writer that we get from
2112 # _make_new_mw is 2bytes -- any more or any less than this
2113 # should be cause for failure, unless it is the tail segment, in
2114 # which case it may not be failure.
2116 mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2118 # 1 bytes != 2 bytes
2119 d = defer.succeed(None)
2120 d.addCallback(lambda ignored, invalid_block=invalid_block:
2121 self.shouldFail(LayoutInvalid, "test blocksize too small",
2122 None, mw.put_block, invalid_block, 0,
2124 invalid_block = invalid_block * 3
2125 # 3 bytes != 2 bytes
2126 d.addCallback(lambda ignored:
2127 self.shouldFail(LayoutInvalid, "test blocksize too large",
2129 mw.put_block, invalid_block, 0, self.salt))
2131 d.addCallback(lambda ignored, i=i:
2132 mw.put_block(self.block, i, self.salt))
2133 # Try to put an invalid tail segment
2134 d.addCallback(lambda ignored:
2135 self.shouldFail(LayoutInvalid, "test invalid tail segment",
2137 mw.put_block, self.block, 5, self.salt))
2139 d.addCallback(lambda ignored:
2140 mw.put_block(valid_block, 5, self.salt))
2144 def test_write_enforces_order_constraints(self):
2145 # We require that the MDMFSlotWriteProxy be interacted with in a
2149 # 1: write blocks and salts
2150 # 2: Write the encrypted private key
2151 # 3: Write the block hashes
2152 # 4: Write the share hashes
2153 # 5: Write the root hash and salt hash
2154 # 6: Write the signature and verification key
2155 # 7: Write the file.
2157 # Some of these can be performed out-of-order, and some can't.
2158 # The dependencies that I want to test here are:
2159 # - Private key before block hashes
2160 # - share hashes and block hashes before root hash
2161 # - root hash before signature
2162 # - signature before verification key
2163 mw0 = self._make_new_mw("si0", 0)
2165 d = defer.succeed(None)
2167 d.addCallback(lambda ignored, i=i:
2168 mw0.put_block(self.block, i, self.salt))
2170 # Try to write the share hash chain without writing the
2171 # encrypted private key
2172 d.addCallback(lambda ignored:
2173 self.shouldFail(LayoutInvalid, "share hash chain before "
2176 mw0.put_sharehashes, self.share_hash_chain))
2177 # Write the private key.
2178 d.addCallback(lambda ignored:
2179 mw0.put_encprivkey(self.encprivkey))
2181 # Now write the block hashes and try again
2182 d.addCallback(lambda ignored:
2183 mw0.put_blockhashes(self.block_hash_tree))
2185 # We haven't yet put the root hash on the share, so we shouldn't
2186 # be able to sign it.
2187 d.addCallback(lambda ignored:
2188 self.shouldFail(LayoutInvalid, "signature before root hash",
2189 None, mw0.put_signature, self.signature))
2191 d.addCallback(lambda ignored:
2192 self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2194 # ..and, since that fails, we also shouldn't be able to put the
2196 d.addCallback(lambda ignored:
2197 self.shouldFail(LayoutInvalid, "key before signature",
2198 None, mw0.put_verification_key,
2199 self.verification_key))
2201 # Now write the share hashes.
2202 d.addCallback(lambda ignored:
2203 mw0.put_sharehashes(self.share_hash_chain))
2204 # We should be able to write the root hash now too
2205 d.addCallback(lambda ignored:
2206 mw0.put_root_hash(self.root_hash))
2208 # We should still be unable to put the verification key
2209 d.addCallback(lambda ignored:
2210 self.shouldFail(LayoutInvalid, "key before signature",
2211 None, mw0.put_verification_key,
2212 self.verification_key))
2214 d.addCallback(lambda ignored:
2215 mw0.put_signature(self.signature))
2217 # We shouldn't be able to write the offsets to the remote server
2218 # until the offset table is finished; IOW, until we have written
2219 # the verification key.
2220 d.addCallback(lambda ignored:
2221 self.shouldFail(LayoutInvalid, "offsets before verification key",
2223 mw0.finish_publishing))
2225 d.addCallback(lambda ignored:
2226 mw0.put_verification_key(self.verification_key))
2230 def test_end_to_end(self):
2231 mw = self._make_new_mw("si1", 0)
2232 # Write a share using the mutable writer, and make sure that the
2233 # reader knows how to read everything back to us.
2234 d = defer.succeed(None)
2236 d.addCallback(lambda ignored, i=i:
2237 mw.put_block(self.block, i, self.salt))
2238 d.addCallback(lambda ignored:
2239 mw.put_encprivkey(self.encprivkey))
2240 d.addCallback(lambda ignored:
2241 mw.put_blockhashes(self.block_hash_tree))
2242 d.addCallback(lambda ignored:
2243 mw.put_sharehashes(self.share_hash_chain))
2244 d.addCallback(lambda ignored:
2245 mw.put_root_hash(self.root_hash))
2246 d.addCallback(lambda ignored:
2247 mw.put_signature(self.signature))
2248 d.addCallback(lambda ignored:
2249 mw.put_verification_key(self.verification_key))
2250 d.addCallback(lambda ignored:
2251 mw.finish_publishing())
2253 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2254 def _check_block_and_salt((block, salt)):
2255 self.failUnlessEqual(block, self.block)
2256 self.failUnlessEqual(salt, self.salt)
2259 d.addCallback(lambda ignored, i=i:
2260 mr.get_block_and_salt(i))
2261 d.addCallback(_check_block_and_salt)
2263 d.addCallback(lambda ignored:
2264 mr.get_encprivkey())
2265 d.addCallback(lambda encprivkey:
2266 self.failUnlessEqual(self.encprivkey, encprivkey))
2268 d.addCallback(lambda ignored:
2269 mr.get_blockhashes())
2270 d.addCallback(lambda blockhashes:
2271 self.failUnlessEqual(self.block_hash_tree, blockhashes))
2273 d.addCallback(lambda ignored:
2274 mr.get_sharehashes())
2275 d.addCallback(lambda sharehashes:
2276 self.failUnlessEqual(self.share_hash_chain, sharehashes))
2278 d.addCallback(lambda ignored:
2280 d.addCallback(lambda signature:
2281 self.failUnlessEqual(signature, self.signature))
2283 d.addCallback(lambda ignored:
2284 mr.get_verification_key())
2285 d.addCallback(lambda verification_key:
2286 self.failUnlessEqual(verification_key, self.verification_key))
2288 d.addCallback(lambda ignored:
2290 d.addCallback(lambda seqnum:
2291 self.failUnlessEqual(seqnum, 0))
2293 d.addCallback(lambda ignored:
2295 d.addCallback(lambda root_hash:
2296 self.failUnlessEqual(self.root_hash, root_hash))
2298 d.addCallback(lambda ignored:
2299 mr.get_encoding_parameters())
2300 def _check_encoding_parameters((k, n, segsize, datalen)):
2301 self.failUnlessEqual(k, 3)
2302 self.failUnlessEqual(n, 10)
2303 self.failUnlessEqual(segsize, 6)
2304 self.failUnlessEqual(datalen, 36)
2305 d.addCallback(_check_encoding_parameters)
2307 d.addCallback(lambda ignored:
2308 mr.get_checkstring())
2309 d.addCallback(lambda checkstring:
2310 self.failUnlessEqual(checkstring, mw.get_checkstring()))
2314 def test_is_sdmf(self):
2315 # The MDMFSlotReadProxy should also know how to read SDMF files,
2316 # since it will encounter them on the grid. Callers use the
2317 # is_sdmf method to test this.
2318 self.write_sdmf_share_to_server("si1")
2319 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2321 d.addCallback(lambda issdmf:
2322 self.failUnless(issdmf))
2326 def test_reads_sdmf(self):
2327 # The slot read proxy should, naturally, know how to tell us
2328 # about data in the SDMF format
2329 self.write_sdmf_share_to_server("si1")
2330 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2331 d = defer.succeed(None)
2332 d.addCallback(lambda ignored:
2334 d.addCallback(lambda issdmf:
2335 self.failUnless(issdmf))
2337 # What do we need to read?
2340 d.addCallback(lambda ignored:
2341 mr.get_block_and_salt(0))
2342 def _check_block_and_salt(results):
2343 block, salt = results
2344 # Our original file is 36 bytes long. Then each share is 12
2345 # bytes in size. The share is composed entirely of the
2346 # letter a. self.block contains 2 as, so 6 * self.block is
2347 # what we are looking for.
2348 self.failUnlessEqual(block, self.block * 6)
2349 self.failUnlessEqual(salt, self.salt)
2350 d.addCallback(_check_block_and_salt)
2353 d.addCallback(lambda ignored:
2354 mr.get_blockhashes())
2355 d.addCallback(lambda blockhashes:
2356 self.failUnlessEqual(self.block_hash_tree,
2360 d.addCallback(lambda ignored:
2361 mr.get_sharehashes())
2362 d.addCallback(lambda sharehashes:
2363 self.failUnlessEqual(self.share_hash_chain,
2366 d.addCallback(lambda ignored:
2367 mr.get_encprivkey())
2368 d.addCallback(lambda encprivkey:
2369 self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2370 d.addCallback(lambda ignored:
2371 mr.get_verification_key())
2372 d.addCallback(lambda verification_key:
2373 self.failUnlessEqual(verification_key,
2374 self.verification_key,
2377 d.addCallback(lambda ignored:
2379 d.addCallback(lambda signature:
2380 self.failUnlessEqual(signature, self.signature, signature))
2382 # - The sequence number
2383 d.addCallback(lambda ignored:
2385 d.addCallback(lambda seqnum:
2386 self.failUnlessEqual(seqnum, 0, seqnum))
2389 d.addCallback(lambda ignored:
2391 d.addCallback(lambda root_hash:
2392 self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2396 def test_only_reads_one_segment_sdmf(self):
2397 # SDMF shares have only one segment, so it doesn't make sense to
2398 # read more segments than that. The reader should know this and
2399 # complain if we try to do that.
2400 self.write_sdmf_share_to_server("si1")
2401 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2402 d = defer.succeed(None)
2403 d.addCallback(lambda ignored:
2405 d.addCallback(lambda issdmf:
2406 self.failUnless(issdmf))
2407 d.addCallback(lambda ignored:
2408 self.shouldFail(LayoutInvalid, "test bad segment",
2410 mr.get_block_and_salt, 1))
2414 def test_read_with_prefetched_mdmf_data(self):
2415 # The MDMFSlotReadProxy will prefill certain fields if you pass
2416 # it data that you have already fetched. This is useful for
2417 # cases like the Servermap, which prefetches ~2kb of data while
2418 # finding out which shares are on the remote peer so that it
2419 # doesn't waste round trips.
2420 mdmf_data = self.build_test_mdmf_share()
2421 self.write_test_share_to_server("si1")
2422 def _make_mr(ignored, length):
2423 mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2426 d = defer.succeed(None)
2427 # This should be enough to fill in both the encoding parameters
2428 # and the table of offsets, which will complete the version
2429 # information tuple.
2430 d.addCallback(_make_mr, 123)
2431 d.addCallback(lambda mr:
2433 def _check_verinfo(verinfo):
2434 self.failUnless(verinfo)
2435 self.failUnlessEqual(len(verinfo), 9)
2445 self.failUnlessEqual(seqnum, 0)
2446 self.failUnlessEqual(root_hash, self.root_hash)
2447 self.failUnlessEqual(segsize, 6)
2448 self.failUnlessEqual(datalen, 36)
2449 self.failUnlessEqual(k, 3)
2450 self.failUnlessEqual(n, 10)
2451 expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2459 self.failUnlessEqual(expected_prefix, prefix)
2460 self.failUnlessEqual(self.rref.read_count, 0)
2461 d.addCallback(_check_verinfo)
2462 # This is not enough data to read a block and a share, so the
2463 # wrapper should attempt to read this from the remote server.
2464 d.addCallback(_make_mr, 123)
2465 d.addCallback(lambda mr:
2466 mr.get_block_and_salt(0))
2467 def _check_block_and_salt((block, salt)):
2468 self.failUnlessEqual(block, self.block)
2469 self.failUnlessEqual(salt, self.salt)
2470 self.failUnlessEqual(self.rref.read_count, 1)
2471 # This should be enough data to read one block.
2472 d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2473 d.addCallback(lambda mr:
2474 mr.get_block_and_salt(0))
2475 d.addCallback(_check_block_and_salt)
2479 def test_read_with_prefetched_sdmf_data(self):
2480 sdmf_data = self.build_test_sdmf_share()
2481 self.write_sdmf_share_to_server("si1")
2482 def _make_mr(ignored, length):
2483 mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2486 d = defer.succeed(None)
2487 # This should be enough to get us the encoding parameters,
2488 # offset table, and everything else we need to build a verinfo
2490 d.addCallback(_make_mr, 123)
2491 d.addCallback(lambda mr:
2493 def _check_verinfo(verinfo):
2494 self.failUnless(verinfo)
2495 self.failUnlessEqual(len(verinfo), 9)
2505 self.failUnlessEqual(seqnum, 0)
2506 self.failUnlessEqual(root_hash, self.root_hash)
2507 self.failUnlessEqual(salt, self.salt)
2508 self.failUnlessEqual(segsize, 36)
2509 self.failUnlessEqual(datalen, 36)
2510 self.failUnlessEqual(k, 3)
2511 self.failUnlessEqual(n, 10)
2512 expected_prefix = struct.pack(SIGNED_PREFIX,
2521 self.failUnlessEqual(expected_prefix, prefix)
2522 self.failUnlessEqual(self.rref.read_count, 0)
2523 d.addCallback(_check_verinfo)
2524 # This shouldn't be enough to read any share data.
2525 d.addCallback(_make_mr, 123)
2526 d.addCallback(lambda mr:
2527 mr.get_block_and_salt(0))
2528 def _check_block_and_salt((block, salt)):
2529 self.failUnlessEqual(block, self.block * 6)
2530 self.failUnlessEqual(salt, self.salt)
2531 # TODO: Fix the read routine so that it reads only the data
2532 # that it has cached if it can't read all of it.
2533 self.failUnlessEqual(self.rref.read_count, 2)
2535 # This should be enough to read share data.
2536 d.addCallback(_make_mr, self.offsets['share_data'])
2537 d.addCallback(lambda mr:
2538 mr.get_block_and_salt(0))
2539 d.addCallback(_check_block_and_salt)
2543 def test_read_with_empty_mdmf_file(self):
2544 # Some tests upload a file with no contents to test things
2545 # unrelated to the actual handling of the content of the file.
2546 # The reader should behave intelligently in these cases.
2547 self.write_test_share_to_server("si1", empty=True)
2548 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2549 # We should be able to get the encoding parameters, and they
2550 # should be correct.
2551 d = defer.succeed(None)
2552 d.addCallback(lambda ignored:
2553 mr.get_encoding_parameters())
2554 def _check_encoding_parameters(params):
2555 self.failUnlessEqual(len(params), 4)
2556 k, n, segsize, datalen = params
2557 self.failUnlessEqual(k, 3)
2558 self.failUnlessEqual(n, 10)
2559 self.failUnlessEqual(segsize, 0)
2560 self.failUnlessEqual(datalen, 0)
2561 d.addCallback(_check_encoding_parameters)
2563 # We should not be able to fetch a block, since there are no
2565 d.addCallback(lambda ignored:
2566 self.shouldFail(LayoutInvalid, "get block on empty file",
2568 mr.get_block_and_salt, 0))
2572 def test_read_with_empty_sdmf_file(self):
2573 self.write_sdmf_share_to_server("si1", empty=True)
2574 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2575 # We should be able to get the encoding parameters, and they
2577 d = defer.succeed(None)
2578 d.addCallback(lambda ignored:
2579 mr.get_encoding_parameters())
2580 def _check_encoding_parameters(params):
2581 self.failUnlessEqual(len(params), 4)
2582 k, n, segsize, datalen = params
2583 self.failUnlessEqual(k, 3)
2584 self.failUnlessEqual(n, 10)
2585 self.failUnlessEqual(segsize, 0)
2586 self.failUnlessEqual(datalen, 0)
2587 d.addCallback(_check_encoding_parameters)
2589 # It does not make sense to get a block in this format, so we
2590 # should not be able to.
2591 d.addCallback(lambda ignored:
2592 self.shouldFail(LayoutInvalid, "get block on an empty file",
2594 mr.get_block_and_salt, 0))
2598 def test_verinfo_with_sdmf_file(self):
2599 self.write_sdmf_share_to_server("si1")
2600 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2601 # We should be able to get the version information.
2602 d = defer.succeed(None)
2603 d.addCallback(lambda ignored:
2605 def _check_verinfo(verinfo):
2606 self.failUnless(verinfo)
2607 self.failUnlessEqual(len(verinfo), 9)
2617 self.failUnlessEqual(seqnum, 0)
2618 self.failUnlessEqual(root_hash, self.root_hash)
2619 self.failUnlessEqual(salt, self.salt)
2620 self.failUnlessEqual(segsize, 36)
2621 self.failUnlessEqual(datalen, 36)
2622 self.failUnlessEqual(k, 3)
2623 self.failUnlessEqual(n, 10)
2624 expected_prefix = struct.pack(">BQ32s16s BBQQ",
2633 self.failUnlessEqual(prefix, expected_prefix)
2634 self.failUnlessEqual(offsets, self.offsets)
2635 d.addCallback(_check_verinfo)
2639 def test_verinfo_with_mdmf_file(self):
2640 self.write_test_share_to_server("si1")
2641 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2642 d = defer.succeed(None)
2643 d.addCallback(lambda ignored:
2645 def _check_verinfo(verinfo):
2646 self.failUnless(verinfo)
2647 self.failUnlessEqual(len(verinfo), 9)
2657 self.failUnlessEqual(seqnum, 0)
2658 self.failUnlessEqual(root_hash, self.root_hash)
2660 self.failUnlessEqual(segsize, 6)
2661 self.failUnlessEqual(datalen, 36)
2662 self.failUnlessEqual(k, 3)
2663 self.failUnlessEqual(n, 10)
2664 expected_prefix = struct.pack(">BQ32s BBQQ",
2672 self.failUnlessEqual(prefix, expected_prefix)
2673 self.failUnlessEqual(offsets, self.offsets)
2674 d.addCallback(_check_verinfo)
2678 def test_sdmf_writer(self):
2679 # Go through the motions of writing an SDMF share to the storage
2680 # server. Then read the storage server to see that the share got
2681 # written in the way that we think it should have.
2683 # We do this first so that the necessary instance variables get
2684 # set the way we want them for the tests below.
2685 data = self.build_test_sdmf_share()
2686 sdmfr = SDMFSlotWriteProxy(0,
2691 # Put the block and salt.
2692 sdmfr.put_block(self.blockdata, 0, self.salt)
2694 # Put the encprivkey
2695 sdmfr.put_encprivkey(self.encprivkey)
2697 # Put the block and share hash chains
2698 sdmfr.put_blockhashes(self.block_hash_tree)
2699 sdmfr.put_sharehashes(self.share_hash_chain)
2700 sdmfr.put_root_hash(self.root_hash)
2703 sdmfr.put_signature(self.signature)
2705 # Put the verification key
2706 sdmfr.put_verification_key(self.verification_key)
2708 # Now check to make sure that nothing has been written yet.
2709 self.failUnlessEqual(self.rref.write_count, 0)
2711 # Now finish publishing
2712 d = sdmfr.finish_publishing()
2714 self.failUnlessEqual(self.rref.write_count, 1)
2715 read = self.ss.remote_slot_readv
2716 self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2718 d.addCallback(_then)
2722 def test_sdmf_writer_preexisting_share(self):
2723 data = self.build_test_sdmf_share()
2724 self.write_sdmf_share_to_server("si1")
2726 # Now there is a share on the storage server. To successfully
2727 # write, we need to set the checkstring correctly. When we
2728 # don't, no write should occur.
2729 sdmfw = SDMFSlotWriteProxy(0,
2734 sdmfw.put_block(self.blockdata, 0, self.salt)
2736 # Put the encprivkey
2737 sdmfw.put_encprivkey(self.encprivkey)
2739 # Put the block and share hash chains
2740 sdmfw.put_blockhashes(self.block_hash_tree)
2741 sdmfw.put_sharehashes(self.share_hash_chain)
2744 sdmfw.put_root_hash(self.root_hash)
2747 sdmfw.put_signature(self.signature)
2749 # Put the verification key
2750 sdmfw.put_verification_key(self.verification_key)
2752 # We shouldn't have a checkstring yet
2753 self.failUnlessEqual(sdmfw.get_checkstring(), "")
2755 d = sdmfw.finish_publishing()
2757 self.failIf(results[0])
2758 # this is the correct checkstring
2759 self._expected_checkstring = results[1][0][0]
2760 return self._expected_checkstring
2762 d.addCallback(_then)
2763 d.addCallback(sdmfw.set_checkstring)
2764 d.addCallback(lambda ignored:
2765 sdmfw.get_checkstring())
2766 d.addCallback(lambda checkstring:
2767 self.failUnlessEqual(checkstring, self._expected_checkstring))
2768 d.addCallback(lambda ignored:
2769 sdmfw.finish_publishing())
2770 def _then_again(results):
2771 self.failUnless(results[0])
2772 read = self.ss.remote_slot_readv
2773 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2774 {0: [struct.pack(">Q", 1)]})
2775 self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2777 d.addCallback(_then_again)
2781 class Stats(unittest.TestCase):
2784 self.sparent = LoggingServiceParent()
2785 self._lease_secret = itertools.count()
2787 return self.sparent.stopService()
2789 def workdir(self, name):
2790 basedir = os.path.join("storage", "Server", name)
2793 def create(self, name):
2794 workdir = self.workdir(name)
2795 ss = StorageServer(workdir, "\x00" * 20)
2796 ss.setServiceParent(self.sparent)
2799 def test_latencies(self):
2800 ss = self.create("test_latencies")
2801 for i in range(10000):
2802 ss.add_latency("allocate", 1.0 * i)
2803 for i in range(1000):
2804 ss.add_latency("renew", 1.0 * i)
2806 ss.add_latency("write", 1.0 * i)
2808 ss.add_latency("cancel", 2.0 * i)
2809 ss.add_latency("get", 5.0)
2811 output = ss.get_latencies()
2813 self.failUnlessEqual(sorted(output.keys()),
2814 sorted(["allocate", "renew", "cancel", "write", "get"]))
2815 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2816 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2817 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2818 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2819 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2820 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2821 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2822 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2823 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2825 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2826 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2827 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
2828 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2829 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2830 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2831 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2832 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2833 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2835 self.failUnlessEqual(len(ss.latencies["write"]), 20)
2836 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2837 self.failUnless(output["write"]["01_0_percentile"] is None, output)
2838 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
2839 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2840 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2841 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2842 self.failUnless(output["write"]["99_0_percentile"] is None, output)
2843 self.failUnless(output["write"]["99_9_percentile"] is None, output)
2845 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2846 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2847 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2848 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
2849 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2850 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2851 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2852 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2853 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2855 self.failUnlessEqual(len(ss.latencies["get"]), 1)
2856 self.failUnless(output["get"]["mean"] is None, output)
2857 self.failUnless(output["get"]["01_0_percentile"] is None, output)
2858 self.failUnless(output["get"]["10_0_percentile"] is None, output)
2859 self.failUnless(output["get"]["50_0_percentile"] is None, output)
2860 self.failUnless(output["get"]["90_0_percentile"] is None, output)
2861 self.failUnless(output["get"]["95_0_percentile"] is None, output)
2862 self.failUnless(output["get"]["99_0_percentile"] is None, output)
2863 self.failUnless(output["get"]["99_9_percentile"] is None, output)
2866 s = re.sub(r'<[^>]*>', ' ', s)
2867 s = re.sub(r'\s+', ' ', s)
2870 class MyBucketCountingCrawler(BucketCountingCrawler):
2871 def finished_prefix(self, cycle, prefix):
2872 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2874 d = self.hook_ds.pop(0)
2877 class MyStorageServer(StorageServer):
2878 def add_bucket_counter(self):
2879 statefile = os.path.join(self.storedir, "bucket_counter.state")
2880 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2881 self.bucket_counter.setServiceParent(self)
2883 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2886 self.s = service.MultiService()
2887 self.s.startService()
2889 return self.s.stopService()
2891 def test_bucket_counter(self):
2892 basedir = "storage/BucketCounter/bucket_counter"
2893 fileutil.make_dirs(basedir)
2894 ss = StorageServer(basedir, "\x00" * 20)
2895 # to make sure we capture the bucket-counting-crawler in the middle
2896 # of a cycle, we reach in and reduce its maximum slice time to 0. We
2897 # also make it start sooner than usual.
2898 ss.bucket_counter.slow_start = 0
2899 orig_cpu_slice = ss.bucket_counter.cpu_slice
2900 ss.bucket_counter.cpu_slice = 0
2901 ss.setServiceParent(self.s)
2903 w = StorageStatus(ss)
2905 # this sample is before the crawler has started doing anything
2906 html = w.renderSynchronously()
2907 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2908 s = remove_tags(html)
2909 self.failUnlessIn("Accepting new shares: Yes", s)
2910 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2911 self.failUnlessIn("Total buckets: Not computed yet", s)
2912 self.failUnlessIn("Next crawl in", s)
2914 # give the bucket-counting-crawler one tick to get started. The
2915 # cpu_slice=0 will force it to yield right after it processes the
2918 d = fireEventually()
2919 def _check(ignored):
2920 # are we really right after the first prefix?
2921 state = ss.bucket_counter.get_state()
2922 if state["last-complete-prefix"] is None:
2923 d2 = fireEventually()
2924 d2.addCallback(_check)
2926 self.failUnlessEqual(state["last-complete-prefix"],
2927 ss.bucket_counter.prefixes[0])
2928 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2929 html = w.renderSynchronously()
2930 s = remove_tags(html)
2931 self.failUnlessIn(" Current crawl ", s)
2932 self.failUnlessIn(" (next work in ", s)
2933 d.addCallback(_check)
2935 # now give it enough time to complete a full cycle
2937 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2938 d.addCallback(lambda ignored: self.poll(_watch))
2939 def _check2(ignored):
2940 ss.bucket_counter.cpu_slice = orig_cpu_slice
2941 html = w.renderSynchronously()
2942 s = remove_tags(html)
2943 self.failUnlessIn("Total buckets: 0 (the number of", s)
2944 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2945 d.addCallback(_check2)
2948 def test_bucket_counter_cleanup(self):
2949 basedir = "storage/BucketCounter/bucket_counter_cleanup"
2950 fileutil.make_dirs(basedir)
2951 ss = StorageServer(basedir, "\x00" * 20)
2952 # to make sure we capture the bucket-counting-crawler in the middle
2953 # of a cycle, we reach in and reduce its maximum slice time to 0.
2954 ss.bucket_counter.slow_start = 0
2955 orig_cpu_slice = ss.bucket_counter.cpu_slice
2956 ss.bucket_counter.cpu_slice = 0
2957 ss.setServiceParent(self.s)
2959 d = fireEventually()
2961 def _after_first_prefix(ignored):
2962 state = ss.bucket_counter.state
2963 if state["last-complete-prefix"] is None:
2964 d2 = fireEventually()
2965 d2.addCallback(_after_first_prefix)
2967 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2968 # now sneak in and mess with its state, to make sure it cleans up
2969 # properly at the end of the cycle
2970 self.failUnlessEqual(state["last-complete-prefix"],
2971 ss.bucket_counter.prefixes[0])
2972 state["bucket-counts"][-12] = {}
2973 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2974 ss.bucket_counter.save_state()
2975 d.addCallback(_after_first_prefix)
2977 # now give it enough time to complete a cycle
2979 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2980 d.addCallback(lambda ignored: self.poll(_watch))
2981 def _check2(ignored):
2982 ss.bucket_counter.cpu_slice = orig_cpu_slice
2983 s = ss.bucket_counter.get_state()
2984 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2985 self.failIf("bogusprefix!" in s["storage-index-samples"],
2986 s["storage-index-samples"].keys())
2987 d.addCallback(_check2)
2990 def test_bucket_counter_eta(self):
2991 basedir = "storage/BucketCounter/bucket_counter_eta"
2992 fileutil.make_dirs(basedir)
2993 ss = MyStorageServer(basedir, "\x00" * 20)
2994 ss.bucket_counter.slow_start = 0
2995 # these will be fired inside finished_prefix()
2996 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2997 w = StorageStatus(ss)
2999 d = defer.Deferred()
3001 def _check_1(ignored):
3002 # no ETA is available yet
3003 html = w.renderSynchronously()
3004 s = remove_tags(html)
3005 self.failUnlessIn("complete (next work", s)
3007 def _check_2(ignored):
3008 # one prefix has finished, so an ETA based upon that elapsed time
3009 # should be available.
3010 html = w.renderSynchronously()
3011 s = remove_tags(html)
3012 self.failUnlessIn("complete (ETA ", s)
3014 def _check_3(ignored):
3015 # two prefixes have finished
3016 html = w.renderSynchronously()
3017 s = remove_tags(html)
3018 self.failUnlessIn("complete (ETA ", s)
3021 hooks[0].addCallback(_check_1).addErrback(d.errback)
3022 hooks[1].addCallback(_check_2).addErrback(d.errback)
3023 hooks[2].addCallback(_check_3).addErrback(d.errback)
3025 ss.setServiceParent(self.s)
3028 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3029 stop_after_first_bucket = False
3030 def process_bucket(self, *args, **kwargs):
3031 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3032 if self.stop_after_first_bucket:
3033 self.stop_after_first_bucket = False
3034 self.cpu_slice = -1.0
3035 def yielding(self, sleep_time):
3036 if not self.stop_after_first_bucket:
3037 self.cpu_slice = 500
3039 class BrokenStatResults:
3041 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3044 bsr = BrokenStatResults()
3045 for attrname in dir(s):
3046 if attrname.startswith("_"):
3048 if attrname == "st_blocks":
3050 setattr(bsr, attrname, getattr(s, attrname))
3053 class InstrumentedStorageServer(StorageServer):
3054 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3055 class No_ST_BLOCKS_StorageServer(StorageServer):
3056 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3058 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3061 self.s = service.MultiService()
3062 self.s.startService()
3064 return self.s.stopService()
3066 def make_shares(self, ss):
3068 return (si, hashutil.tagged_hash("renew", si),
3069 hashutil.tagged_hash("cancel", si))
3070 def make_mutable(si):
3071 return (si, hashutil.tagged_hash("renew", si),
3072 hashutil.tagged_hash("cancel", si),
3073 hashutil.tagged_hash("write-enabler", si))
3074 def make_extra_lease(si, num):
3075 return (hashutil.tagged_hash("renew-%d" % num, si),
3076 hashutil.tagged_hash("cancel-%d" % num, si))
3078 immutable_si_0, rs0, cs0 = make("\x00" * 16)
3079 immutable_si_1, rs1, cs1 = make("\x01" * 16)
3080 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3081 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3082 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3083 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3085 canary = FakeCanary()
3086 # note: 'tahoe debug dump-share' will not handle this file, since the
3087 # inner contents are not a valid CHK share
3088 data = "\xff" * 1000
3090 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3092 w[0].remote_write(0, data)
3095 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3097 w[0].remote_write(0, data)
3099 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3101 writev = ss.remote_slot_testv_and_readv_and_writev
3102 writev(mutable_si_2, (we2, rs2, cs2),
3103 {0: ([], [(0,data)], len(data))}, [])
3104 writev(mutable_si_3, (we3, rs3, cs3),
3105 {0: ([], [(0,data)], len(data))}, [])
3106 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3108 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3109 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3110 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3112 def test_basic(self):
3113 basedir = "storage/LeaseCrawler/basic"
3114 fileutil.make_dirs(basedir)
3115 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3116 # make it start sooner than usual.
3117 lc = ss.lease_checker
3120 lc.stop_after_first_bucket = True
3121 webstatus = StorageStatus(ss)
3123 # create a few shares, with some leases on them
3124 self.make_shares(ss)
3125 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3127 # add a non-sharefile to exercise another code path
3128 fn = os.path.join(ss.sharedir,
3129 storage_index_to_dir(immutable_si_0),
3132 f.write("I am not a share.\n")
3135 # this is before the crawl has started, so we're not in a cycle yet
3136 initial_state = lc.get_state()
3137 self.failIf(lc.get_progress()["cycle-in-progress"])
3138 self.failIfIn("cycle-to-date", initial_state)
3139 self.failIfIn("estimated-remaining-cycle", initial_state)
3140 self.failIfIn("estimated-current-cycle", initial_state)
3141 self.failUnlessIn("history", initial_state)
3142 self.failUnlessEqual(initial_state["history"], {})
3144 ss.setServiceParent(self.s)
3148 d = fireEventually()
3150 # now examine the state right after the first bucket has been
3152 def _after_first_bucket(ignored):
3153 initial_state = lc.get_state()
3154 if "cycle-to-date" not in initial_state:
3155 d2 = fireEventually()
3156 d2.addCallback(_after_first_bucket)
3158 self.failUnlessIn("cycle-to-date", initial_state)
3159 self.failUnlessIn("estimated-remaining-cycle", initial_state)
3160 self.failUnlessIn("estimated-current-cycle", initial_state)
3161 self.failUnlessIn("history", initial_state)
3162 self.failUnlessEqual(initial_state["history"], {})
3164 so_far = initial_state["cycle-to-date"]
3165 self.failUnlessEqual(so_far["expiration-enabled"], False)
3166 self.failUnlessIn("configured-expiration-mode", so_far)
3167 self.failUnlessIn("lease-age-histogram", so_far)
3168 lah = so_far["lease-age-histogram"]
3169 self.failUnlessEqual(type(lah), list)
3170 self.failUnlessEqual(len(lah), 1)
3171 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3172 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3173 self.failUnlessEqual(so_far["corrupt-shares"], [])
3174 sr1 = so_far["space-recovered"]
3175 self.failUnlessEqual(sr1["examined-buckets"], 1)
3176 self.failUnlessEqual(sr1["examined-shares"], 1)
3177 self.failUnlessEqual(sr1["actual-shares"], 0)
3178 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3179 self.failUnlessEqual(sr1["original-sharebytes"], 0)
3180 left = initial_state["estimated-remaining-cycle"]
3181 sr2 = left["space-recovered"]
3182 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3183 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3184 self.failIfEqual(sr2["actual-shares"], None)
3185 self.failIfEqual(sr2["configured-diskbytes"], None)
3186 self.failIfEqual(sr2["original-sharebytes"], None)
3187 d.addCallback(_after_first_bucket)
3188 d.addCallback(lambda ign: self.render1(webstatus))
3189 def _check_html_in_cycle(html):
3190 s = remove_tags(html)
3191 self.failUnlessIn("So far, this cycle has examined "
3192 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3193 self.failUnlessIn("and has recovered: "
3194 "0 shares, 0 buckets (0 mutable / 0 immutable), "
3195 "0 B (0 B / 0 B)", s)
3196 self.failUnlessIn("If expiration were enabled, "
3197 "we would have recovered: "
3198 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3199 " 0 B (0 B / 0 B) by now", s)
3200 self.failUnlessIn("and the remainder of this cycle "
3201 "would probably recover: "
3202 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3203 " 0 B (0 B / 0 B)", s)
3204 self.failUnlessIn("and the whole cycle would probably recover: "
3205 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3206 " 0 B (0 B / 0 B)", s)
3207 self.failUnlessIn("if we were strictly using each lease's default "
3208 "31-day lease lifetime", s)
3209 self.failUnlessIn("this cycle would be expected to recover: ", s)
3210 d.addCallback(_check_html_in_cycle)
3212 # wait for the crawler to finish the first cycle. Nothing should have
3215 return bool(lc.get_state()["last-cycle-finished"] is not None)
3216 d.addCallback(lambda ign: self.poll(_wait))
3218 def _after_first_cycle(ignored):
3220 self.failIf("cycle-to-date" in s)
3221 self.failIf("estimated-remaining-cycle" in s)
3222 self.failIf("estimated-current-cycle" in s)
3223 last = s["history"][0]
3224 self.failUnlessIn("cycle-start-finish-times", last)
3225 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3226 self.failUnlessEqual(last["expiration-enabled"], False)
3227 self.failUnlessIn("configured-expiration-mode", last)
3229 self.failUnlessIn("lease-age-histogram", last)
3230 lah = last["lease-age-histogram"]
3231 self.failUnlessEqual(type(lah), list)
3232 self.failUnlessEqual(len(lah), 1)
3233 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3235 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3236 self.failUnlessEqual(last["corrupt-shares"], [])
3238 rec = last["space-recovered"]
3239 self.failUnlessEqual(rec["examined-buckets"], 4)
3240 self.failUnlessEqual(rec["examined-shares"], 4)
3241 self.failUnlessEqual(rec["actual-buckets"], 0)
3242 self.failUnlessEqual(rec["original-buckets"], 0)
3243 self.failUnlessEqual(rec["configured-buckets"], 0)
3244 self.failUnlessEqual(rec["actual-shares"], 0)
3245 self.failUnlessEqual(rec["original-shares"], 0)
3246 self.failUnlessEqual(rec["configured-shares"], 0)
3247 self.failUnlessEqual(rec["actual-diskbytes"], 0)
3248 self.failUnlessEqual(rec["original-diskbytes"], 0)
3249 self.failUnlessEqual(rec["configured-diskbytes"], 0)
3250 self.failUnlessEqual(rec["actual-sharebytes"], 0)
3251 self.failUnlessEqual(rec["original-sharebytes"], 0)
3252 self.failUnlessEqual(rec["configured-sharebytes"], 0)
3254 def _get_sharefile(si):
3255 return list(ss._iter_share_files(si))[0]
3256 def count_leases(si):
3257 return len(list(_get_sharefile(si).get_leases()))
3258 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3259 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3260 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3261 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3262 d.addCallback(_after_first_cycle)
3263 d.addCallback(lambda ign: self.render1(webstatus))
3264 def _check_html(html):
3265 s = remove_tags(html)
3266 self.failUnlessIn("recovered: 0 shares, 0 buckets "
3267 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3268 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3269 "(2 mutable / 2 immutable),", s)
3270 self.failUnlessIn("but expiration was not enabled", s)
3271 d.addCallback(_check_html)
3272 d.addCallback(lambda ign: self.render_json(webstatus))
3273 def _check_json(json):
3274 data = simplejson.loads(json)
3275 self.failUnlessIn("lease-checker", data)
3276 self.failUnlessIn("lease-checker-progress", data)
3277 d.addCallback(_check_json)
3280 def backdate_lease(self, sf, renew_secret, new_expire_time):
3281 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3282 # "renew" a lease with a new_expire_time that is older than what the
3283 # current lease has), so we have to reach inside it.
3284 for i,lease in enumerate(sf.get_leases()):
3285 if lease.renew_secret == renew_secret:
3286 lease.expiration_time = new_expire_time
3287 f = open(sf.home, 'rb+')
3288 sf._write_lease_record(f, i, lease)
3291 raise IndexError("unable to renew non-existent lease")
3293 def test_expire_age(self):
3294 basedir = "storage/LeaseCrawler/expire_age"
3295 fileutil.make_dirs(basedir)
3296 # setting expiration_time to 2000 means that any lease which is more
3297 # than 2000s old will be expired.
3298 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3299 expiration_enabled=True,
3300 expiration_mode="age",
3301 expiration_override_lease_duration=2000)
3302 # make it start sooner than usual.
3303 lc = ss.lease_checker
3305 lc.stop_after_first_bucket = True
3306 webstatus = StorageStatus(ss)
3308 # create a few shares, with some leases on them
3309 self.make_shares(ss)
3310 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3312 def count_shares(si):
3313 return len(list(ss._iter_share_files(si)))
3314 def _get_sharefile(si):
3315 return list(ss._iter_share_files(si))[0]
3316 def count_leases(si):
3317 return len(list(_get_sharefile(si).get_leases()))
3319 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3320 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3321 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3322 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3323 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3324 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3325 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3326 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3328 # artificially crank back the expiration time on the first lease of
3329 # each share, to make it look like it expired already (age=1000s).
3330 # Some shares have an extra lease which is set to expire at the
3331 # default time in 31 days from now (age=31days). We then run the
3332 # crawler, which will expire the first lease, making some shares get
3333 # deleted and others stay alive (with one remaining lease)
3336 sf0 = _get_sharefile(immutable_si_0)
3337 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3338 sf0_size = os.stat(sf0.home).st_size
3340 # immutable_si_1 gets an extra lease
3341 sf1 = _get_sharefile(immutable_si_1)
3342 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3344 sf2 = _get_sharefile(mutable_si_2)
3345 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3346 sf2_size = os.stat(sf2.home).st_size
3348 # mutable_si_3 gets an extra lease
3349 sf3 = _get_sharefile(mutable_si_3)
3350 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3352 ss.setServiceParent(self.s)
3354 d = fireEventually()
3355 # examine the state right after the first bucket has been processed
3356 def _after_first_bucket(ignored):
3357 p = lc.get_progress()
3358 if not p["cycle-in-progress"]:
3359 d2 = fireEventually()
3360 d2.addCallback(_after_first_bucket)
3362 d.addCallback(_after_first_bucket)
3363 d.addCallback(lambda ign: self.render1(webstatus))
3364 def _check_html_in_cycle(html):
3365 s = remove_tags(html)
3366 # the first bucket encountered gets deleted, and its prefix
3367 # happens to be about 1/5th of the way through the ring, so the
3368 # predictor thinks we'll have 5 shares and that we'll delete them
3369 # all. This part of the test depends upon the SIs landing right
3370 # where they do now.
3371 self.failUnlessIn("The remainder of this cycle is expected to "
3372 "recover: 4 shares, 4 buckets", s)
3373 self.failUnlessIn("The whole cycle is expected to examine "
3374 "5 shares in 5 buckets and to recover: "
3375 "5 shares, 5 buckets", s)
3376 d.addCallback(_check_html_in_cycle)
3378 # wait for the crawler to finish the first cycle. Two shares should
3381 return bool(lc.get_state()["last-cycle-finished"] is not None)
3382 d.addCallback(lambda ign: self.poll(_wait))
3384 def _after_first_cycle(ignored):
3385 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3386 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3387 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3388 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3389 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3390 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3393 last = s["history"][0]
3395 self.failUnlessEqual(last["expiration-enabled"], True)
3396 self.failUnlessEqual(last["configured-expiration-mode"],
3397 ("age", 2000, None, ("mutable", "immutable")))
3398 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3400 rec = last["space-recovered"]
3401 self.failUnlessEqual(rec["examined-buckets"], 4)
3402 self.failUnlessEqual(rec["examined-shares"], 4)
3403 self.failUnlessEqual(rec["actual-buckets"], 2)
3404 self.failUnlessEqual(rec["original-buckets"], 2)
3405 self.failUnlessEqual(rec["configured-buckets"], 2)
3406 self.failUnlessEqual(rec["actual-shares"], 2)
3407 self.failUnlessEqual(rec["original-shares"], 2)
3408 self.failUnlessEqual(rec["configured-shares"], 2)
3409 size = sf0_size + sf2_size
3410 self.failUnlessEqual(rec["actual-sharebytes"], size)
3411 self.failUnlessEqual(rec["original-sharebytes"], size)
3412 self.failUnlessEqual(rec["configured-sharebytes"], size)
3413 # different platforms have different notions of "blocks used by
3414 # this file", so merely assert that it's a number
3415 self.failUnless(rec["actual-diskbytes"] >= 0,
3416 rec["actual-diskbytes"])
3417 self.failUnless(rec["original-diskbytes"] >= 0,
3418 rec["original-diskbytes"])
3419 self.failUnless(rec["configured-diskbytes"] >= 0,
3420 rec["configured-diskbytes"])
3421 d.addCallback(_after_first_cycle)
3422 d.addCallback(lambda ign: self.render1(webstatus))
3423 def _check_html(html):
3424 s = remove_tags(html)
3425 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3426 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3427 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3428 d.addCallback(_check_html)
3431 def test_expire_cutoff_date(self):
3432 basedir = "storage/LeaseCrawler/expire_cutoff_date"
3433 fileutil.make_dirs(basedir)
3434 # setting cutoff-date to 2000 seconds ago means that any lease which
3435 # is more than 2000s old will be expired.
3437 then = int(now - 2000)
3438 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3439 expiration_enabled=True,
3440 expiration_mode="cutoff-date",
3441 expiration_cutoff_date=then)
3442 # make it start sooner than usual.
3443 lc = ss.lease_checker
3445 lc.stop_after_first_bucket = True
3446 webstatus = StorageStatus(ss)
3448 # create a few shares, with some leases on them
3449 self.make_shares(ss)
3450 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3452 def count_shares(si):
3453 return len(list(ss._iter_share_files(si)))
3454 def _get_sharefile(si):
3455 return list(ss._iter_share_files(si))[0]
3456 def count_leases(si):
3457 return len(list(_get_sharefile(si).get_leases()))
3459 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3460 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3461 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3462 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3463 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3464 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3465 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3466 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3468 # artificially crank back the expiration time on the first lease of
3469 # each share, to make it look like was renewed 3000s ago. To achieve
3470 # this, we need to set the expiration time to now-3000+31days. This
3471 # will change when the lease format is improved to contain both
3472 # create/renew time and duration.
3473 new_expiration_time = now - 3000 + 31*24*60*60
3475 # Some shares have an extra lease which is set to expire at the
3476 # default time in 31 days from now (age=31days). We then run the
3477 # crawler, which will expire the first lease, making some shares get
3478 # deleted and others stay alive (with one remaining lease)
3480 sf0 = _get_sharefile(immutable_si_0)
3481 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3482 sf0_size = os.stat(sf0.home).st_size
3484 # immutable_si_1 gets an extra lease
3485 sf1 = _get_sharefile(immutable_si_1)
3486 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3488 sf2 = _get_sharefile(mutable_si_2)
3489 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3490 sf2_size = os.stat(sf2.home).st_size
3492 # mutable_si_3 gets an extra lease
3493 sf3 = _get_sharefile(mutable_si_3)
3494 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3496 ss.setServiceParent(self.s)
3498 d = fireEventually()
3499 # examine the state right after the first bucket has been processed
3500 def _after_first_bucket(ignored):
3501 p = lc.get_progress()
3502 if not p["cycle-in-progress"]:
3503 d2 = fireEventually()
3504 d2.addCallback(_after_first_bucket)
3506 d.addCallback(_after_first_bucket)
3507 d.addCallback(lambda ign: self.render1(webstatus))
3508 def _check_html_in_cycle(html):
3509 s = remove_tags(html)
3510 # the first bucket encountered gets deleted, and its prefix
3511 # happens to be about 1/5th of the way through the ring, so the
3512 # predictor thinks we'll have 5 shares and that we'll delete them
3513 # all. This part of the test depends upon the SIs landing right
3514 # where they do now.
3515 self.failUnlessIn("The remainder of this cycle is expected to "
3516 "recover: 4 shares, 4 buckets", s)
3517 self.failUnlessIn("The whole cycle is expected to examine "
3518 "5 shares in 5 buckets and to recover: "
3519 "5 shares, 5 buckets", s)
3520 d.addCallback(_check_html_in_cycle)
3522 # wait for the crawler to finish the first cycle. Two shares should
3525 return bool(lc.get_state()["last-cycle-finished"] is not None)
3526 d.addCallback(lambda ign: self.poll(_wait))
3528 def _after_first_cycle(ignored):
3529 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3530 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3531 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3532 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3533 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3534 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3537 last = s["history"][0]
3539 self.failUnlessEqual(last["expiration-enabled"], True)
3540 self.failUnlessEqual(last["configured-expiration-mode"],
3541 ("cutoff-date", None, then,
3542 ("mutable", "immutable")))
3543 self.failUnlessEqual(last["leases-per-share-histogram"],
3546 rec = last["space-recovered"]
3547 self.failUnlessEqual(rec["examined-buckets"], 4)
3548 self.failUnlessEqual(rec["examined-shares"], 4)
3549 self.failUnlessEqual(rec["actual-buckets"], 2)
3550 self.failUnlessEqual(rec["original-buckets"], 0)
3551 self.failUnlessEqual(rec["configured-buckets"], 2)
3552 self.failUnlessEqual(rec["actual-shares"], 2)
3553 self.failUnlessEqual(rec["original-shares"], 0)
3554 self.failUnlessEqual(rec["configured-shares"], 2)
3555 size = sf0_size + sf2_size
3556 self.failUnlessEqual(rec["actual-sharebytes"], size)
3557 self.failUnlessEqual(rec["original-sharebytes"], 0)
3558 self.failUnlessEqual(rec["configured-sharebytes"], size)
3559 # different platforms have different notions of "blocks used by
3560 # this file", so merely assert that it's a number
3561 self.failUnless(rec["actual-diskbytes"] >= 0,
3562 rec["actual-diskbytes"])
3563 self.failUnless(rec["original-diskbytes"] >= 0,
3564 rec["original-diskbytes"])
3565 self.failUnless(rec["configured-diskbytes"] >= 0,
3566 rec["configured-diskbytes"])
3567 d.addCallback(_after_first_cycle)
3568 d.addCallback(lambda ign: self.render1(webstatus))
3569 def _check_html(html):
3570 s = remove_tags(html)
3571 self.failUnlessIn("Expiration Enabled:"
3572 " expired leases will be removed", s)
3573 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3574 substr = "Leases created or last renewed before %s will be considered expired." % date
3575 self.failUnlessIn(substr, s)
3576 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3577 d.addCallback(_check_html)
3580 def test_only_immutable(self):
3581 basedir = "storage/LeaseCrawler/only_immutable"
3582 fileutil.make_dirs(basedir)
3584 then = int(now - 2000)
3585 ss = StorageServer(basedir, "\x00" * 20,
3586 expiration_enabled=True,
3587 expiration_mode="cutoff-date",
3588 expiration_cutoff_date=then,
3589 expiration_sharetypes=("immutable",))
3590 lc = ss.lease_checker
3592 webstatus = StorageStatus(ss)
3594 self.make_shares(ss)
3595 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3596 # set all leases to be expirable
3597 new_expiration_time = now - 3000 + 31*24*60*60
3599 def count_shares(si):
3600 return len(list(ss._iter_share_files(si)))
3601 def _get_sharefile(si):
3602 return list(ss._iter_share_files(si))[0]
3603 def count_leases(si):
3604 return len(list(_get_sharefile(si).get_leases()))
3606 sf0 = _get_sharefile(immutable_si_0)
3607 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3608 sf1 = _get_sharefile(immutable_si_1)
3609 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3610 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3611 sf2 = _get_sharefile(mutable_si_2)
3612 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3613 sf3 = _get_sharefile(mutable_si_3)
3614 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3615 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3617 ss.setServiceParent(self.s)
3619 return bool(lc.get_state()["last-cycle-finished"] is not None)
3620 d = self.poll(_wait)
3622 def _after_first_cycle(ignored):
3623 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3624 self.failUnlessEqual(count_shares(immutable_si_1), 0)
3625 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3626 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3627 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3628 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3629 d.addCallback(_after_first_cycle)
3630 d.addCallback(lambda ign: self.render1(webstatus))
3631 def _check_html(html):
3632 s = remove_tags(html)
3633 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3634 d.addCallback(_check_html)
3637 def test_only_mutable(self):
3638 basedir = "storage/LeaseCrawler/only_mutable"
3639 fileutil.make_dirs(basedir)
3641 then = int(now - 2000)
3642 ss = StorageServer(basedir, "\x00" * 20,
3643 expiration_enabled=True,
3644 expiration_mode="cutoff-date",
3645 expiration_cutoff_date=then,
3646 expiration_sharetypes=("mutable",))
3647 lc = ss.lease_checker
3649 webstatus = StorageStatus(ss)
3651 self.make_shares(ss)
3652 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3653 # set all leases to be expirable
3654 new_expiration_time = now - 3000 + 31*24*60*60
3656 def count_shares(si):
3657 return len(list(ss._iter_share_files(si)))
3658 def _get_sharefile(si):
3659 return list(ss._iter_share_files(si))[0]
3660 def count_leases(si):
3661 return len(list(_get_sharefile(si).get_leases()))
3663 sf0 = _get_sharefile(immutable_si_0)
3664 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3665 sf1 = _get_sharefile(immutable_si_1)
3666 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3667 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3668 sf2 = _get_sharefile(mutable_si_2)
3669 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3670 sf3 = _get_sharefile(mutable_si_3)
3671 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3672 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3674 ss.setServiceParent(self.s)
3676 return bool(lc.get_state()["last-cycle-finished"] is not None)
3677 d = self.poll(_wait)
3679 def _after_first_cycle(ignored):
3680 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3681 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3682 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3683 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3684 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3685 self.failUnlessEqual(count_shares(mutable_si_3), 0)
3686 d.addCallback(_after_first_cycle)
3687 d.addCallback(lambda ign: self.render1(webstatus))
3688 def _check_html(html):
3689 s = remove_tags(html)
3690 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3691 d.addCallback(_check_html)
3694 def test_bad_mode(self):
3695 basedir = "storage/LeaseCrawler/bad_mode"
3696 fileutil.make_dirs(basedir)
3697 e = self.failUnlessRaises(ValueError,
3698 StorageServer, basedir, "\x00" * 20,
3699 expiration_mode="bogus")
3700 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3702 def test_parse_duration(self):
3706 p = time_format.parse_duration
3707 self.failUnlessEqual(p("7days"), 7*DAY)
3708 self.failUnlessEqual(p("31day"), 31*DAY)
3709 self.failUnlessEqual(p("60 days"), 60*DAY)
3710 self.failUnlessEqual(p("2mo"), 2*MONTH)
3711 self.failUnlessEqual(p("3 month"), 3*MONTH)
3712 self.failUnlessEqual(p("2years"), 2*YEAR)
3713 e = self.failUnlessRaises(ValueError, p, "2kumquats")
3714 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3716 def test_parse_date(self):
3717 p = time_format.parse_date
3718 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3719 self.failUnlessEqual(p("2009-03-18"), 1237334400)
3721 def test_limited_history(self):
3722 basedir = "storage/LeaseCrawler/limited_history"
3723 fileutil.make_dirs(basedir)
3724 ss = StorageServer(basedir, "\x00" * 20)
3725 # make it start sooner than usual.
3726 lc = ss.lease_checker
3730 # create a few shares, with some leases on them
3731 self.make_shares(ss)
3733 ss.setServiceParent(self.s)
3735 def _wait_until_15_cycles_done():
3736 last = lc.state["last-cycle-finished"]
3737 if last is not None and last >= 15:
3742 d = self.poll(_wait_until_15_cycles_done)
3744 def _check(ignored):
3747 self.failUnlessEqual(len(h), 10)
3748 self.failUnlessEqual(max(h.keys()), 15)
3749 self.failUnlessEqual(min(h.keys()), 6)
3750 d.addCallback(_check)
3753 def test_unpredictable_future(self):
3754 basedir = "storage/LeaseCrawler/unpredictable_future"
3755 fileutil.make_dirs(basedir)
3756 ss = StorageServer(basedir, "\x00" * 20)
3757 # make it start sooner than usual.
3758 lc = ss.lease_checker
3760 lc.cpu_slice = -1.0 # stop quickly
3762 self.make_shares(ss)
3764 ss.setServiceParent(self.s)
3766 d = fireEventually()
3767 def _check(ignored):
3768 # this should fire after the first bucket is complete, but before
3769 # the first prefix is complete, so the progress-measurer won't
3770 # think we've gotten far enough to raise our percent-complete
3771 # above 0%, triggering the cannot-predict-the-future code in
3772 # expirer.py . This will have to change if/when the
3773 # progress-measurer gets smart enough to count buckets (we'll
3774 # have to interrupt it even earlier, before it's finished the
3777 if "cycle-to-date" not in s:
3778 d2 = fireEventually()
3779 d2.addCallback(_check)
3781 self.failUnlessIn("cycle-to-date", s)
3782 self.failUnlessIn("estimated-remaining-cycle", s)
3783 self.failUnlessIn("estimated-current-cycle", s)
3785 left = s["estimated-remaining-cycle"]["space-recovered"]
3786 self.failUnlessEqual(left["actual-buckets"], None)
3787 self.failUnlessEqual(left["original-buckets"], None)
3788 self.failUnlessEqual(left["configured-buckets"], None)
3789 self.failUnlessEqual(left["actual-shares"], None)
3790 self.failUnlessEqual(left["original-shares"], None)
3791 self.failUnlessEqual(left["configured-shares"], None)
3792 self.failUnlessEqual(left["actual-diskbytes"], None)
3793 self.failUnlessEqual(left["original-diskbytes"], None)
3794 self.failUnlessEqual(left["configured-diskbytes"], None)
3795 self.failUnlessEqual(left["actual-sharebytes"], None)
3796 self.failUnlessEqual(left["original-sharebytes"], None)
3797 self.failUnlessEqual(left["configured-sharebytes"], None)
3799 full = s["estimated-remaining-cycle"]["space-recovered"]
3800 self.failUnlessEqual(full["actual-buckets"], None)
3801 self.failUnlessEqual(full["original-buckets"], None)
3802 self.failUnlessEqual(full["configured-buckets"], None)
3803 self.failUnlessEqual(full["actual-shares"], None)
3804 self.failUnlessEqual(full["original-shares"], None)
3805 self.failUnlessEqual(full["configured-shares"], None)
3806 self.failUnlessEqual(full["actual-diskbytes"], None)
3807 self.failUnlessEqual(full["original-diskbytes"], None)
3808 self.failUnlessEqual(full["configured-diskbytes"], None)
3809 self.failUnlessEqual(full["actual-sharebytes"], None)
3810 self.failUnlessEqual(full["original-sharebytes"], None)
3811 self.failUnlessEqual(full["configured-sharebytes"], None)
3813 d.addCallback(_check)
3816 def test_no_st_blocks(self):
3817 basedir = "storage/LeaseCrawler/no_st_blocks"
3818 fileutil.make_dirs(basedir)
3819 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3820 expiration_mode="age",
3821 expiration_override_lease_duration=-1000)
3822 # a negative expiration_time= means the "configured-"
3823 # space-recovered counts will be non-zero, since all shares will have
3826 # make it start sooner than usual.
3827 lc = ss.lease_checker
3830 self.make_shares(ss)
3831 ss.setServiceParent(self.s)
3833 return bool(lc.get_state()["last-cycle-finished"] is not None)
3834 d = self.poll(_wait)
3836 def _check(ignored):
3838 last = s["history"][0]
3839 rec = last["space-recovered"]
3840 self.failUnlessEqual(rec["configured-buckets"], 4)
3841 self.failUnlessEqual(rec["configured-shares"], 4)
3842 self.failUnless(rec["configured-sharebytes"] > 0,
3843 rec["configured-sharebytes"])
3844 # without the .st_blocks field in os.stat() results, we should be
3845 # reporting diskbytes==sharebytes
3846 self.failUnlessEqual(rec["configured-sharebytes"],
3847 rec["configured-diskbytes"])
3848 d.addCallback(_check)
3851 def test_share_corruption(self):
3852 self._poll_should_ignore_these_errors = [
3853 UnknownMutableContainerVersionError,
3854 UnknownImmutableContainerVersionError,
3856 basedir = "storage/LeaseCrawler/share_corruption"
3857 fileutil.make_dirs(basedir)
3858 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3859 w = StorageStatus(ss)
3860 # make it start sooner than usual.
3861 lc = ss.lease_checker
3862 lc.stop_after_first_bucket = True
3866 # create a few shares, with some leases on them
3867 self.make_shares(ss)
3869 # now corrupt one, and make sure the lease-checker keeps going
3870 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3871 first = min(self.sis)
3872 first_b32 = base32.b2a(first)
3873 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3876 f.write("BAD MAGIC")
3878 # if get_share_file() doesn't see the correct mutable magic, it
3879 # assumes the file is an immutable share, and then
3880 # immutable.ShareFile sees a bad version. So regardless of which kind
3881 # of share we corrupted, this will trigger an
3882 # UnknownImmutableContainerVersionError.
3884 # also create an empty bucket
3885 empty_si = base32.b2a("\x04"*16)
3886 empty_bucket_dir = os.path.join(ss.sharedir,
3887 storage_index_to_dir(empty_si))
3888 fileutil.make_dirs(empty_bucket_dir)
3890 ss.setServiceParent(self.s)
3892 d = fireEventually()
3894 # now examine the state right after the first bucket has been
3896 def _after_first_bucket(ignored):
3898 if "cycle-to-date" not in s:
3899 d2 = fireEventually()
3900 d2.addCallback(_after_first_bucket)
3902 so_far = s["cycle-to-date"]
3903 rec = so_far["space-recovered"]
3904 self.failUnlessEqual(rec["examined-buckets"], 1)
3905 self.failUnlessEqual(rec["examined-shares"], 0)
3906 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3907 d.addCallback(_after_first_bucket)
3909 d.addCallback(lambda ign: self.render_json(w))
3910 def _check_json(json):
3911 data = simplejson.loads(json)
3912 # grr. json turns all dict keys into strings.
3913 so_far = data["lease-checker"]["cycle-to-date"]
3914 corrupt_shares = so_far["corrupt-shares"]
3915 # it also turns all tuples into lists
3916 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3917 d.addCallback(_check_json)
3918 d.addCallback(lambda ign: self.render1(w))
3919 def _check_html(html):
3920 s = remove_tags(html)
3921 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3922 d.addCallback(_check_html)
3925 return bool(lc.get_state()["last-cycle-finished"] is not None)
3926 d.addCallback(lambda ign: self.poll(_wait))
3928 def _after_first_cycle(ignored):
3930 last = s["history"][0]
3931 rec = last["space-recovered"]
3932 self.failUnlessEqual(rec["examined-buckets"], 5)
3933 self.failUnlessEqual(rec["examined-shares"], 3)
3934 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3935 d.addCallback(_after_first_cycle)
3936 d.addCallback(lambda ign: self.render_json(w))
3937 def _check_json_history(json):
3938 data = simplejson.loads(json)
3939 last = data["lease-checker"]["history"]["0"]
3940 corrupt_shares = last["corrupt-shares"]
3941 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3942 d.addCallback(_check_json_history)
3943 d.addCallback(lambda ign: self.render1(w))
3944 def _check_html_history(html):
3945 s = remove_tags(html)
3946 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3947 d.addCallback(_check_html_history)
3950 self.flushLoggedErrors(UnknownMutableContainerVersionError,
3951 UnknownImmutableContainerVersionError)
3956 def render_json(self, page):
3957 d = self.render1(page, args={"t": ["json"]})
3960 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3963 self.s = service.MultiService()
3964 self.s.startService()
3966 return self.s.stopService()
3968 def test_no_server(self):
3969 w = StorageStatus(None)
3970 html = w.renderSynchronously()
3971 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3973 def test_status(self):
3974 basedir = "storage/WebStatus/status"
3975 fileutil.make_dirs(basedir)
3976 nodeid = "\x00" * 20
3977 ss = StorageServer(basedir, nodeid)
3978 ss.setServiceParent(self.s)
3979 w = StorageStatus(ss, "nickname")
3981 def _check_html(html):
3982 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3983 s = remove_tags(html)
3984 self.failUnlessIn("Server Nickname: nickname", s)
3985 self.failUnlessIn("Server Nodeid: %s" % base32.b2a(nodeid), s)
3986 self.failUnlessIn("Accepting new shares: Yes", s)
3987 self.failUnlessIn("Reserved space: - 0 B (0)", s)
3988 d.addCallback(_check_html)
3989 d.addCallback(lambda ign: self.render_json(w))
3990 def _check_json(json):
3991 data = simplejson.loads(json)
3993 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3994 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3995 self.failUnlessIn("bucket-counter", data)
3996 self.failUnlessIn("lease-checker", data)
3997 d.addCallback(_check_json)
4000 def render_json(self, page):
4001 d = self.render1(page, args={"t": ["json"]})
4004 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4005 def test_status_no_disk_stats(self, mock_get_disk_stats):
4006 mock_get_disk_stats.side_effect = AttributeError()
4008 # Some platforms may have no disk stats API. Make sure the code can handle that
4009 # (test runs on all platforms).
4010 basedir = "storage/WebStatus/status_no_disk_stats"
4011 fileutil.make_dirs(basedir)
4012 ss = StorageServer(basedir, "\x00" * 20)
4013 ss.setServiceParent(self.s)
4014 w = StorageStatus(ss)
4015 html = w.renderSynchronously()
4016 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4017 s = remove_tags(html)
4018 self.failUnlessIn("Accepting new shares: Yes", s)
4019 self.failUnlessIn("Total disk space: ?", s)
4020 self.failUnlessIn("Space Available to Tahoe: ?", s)
4021 self.failUnless(ss.get_available_space() is None)
4023 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4024 def test_status_bad_disk_stats(self, mock_get_disk_stats):
4025 mock_get_disk_stats.side_effect = OSError()
4027 # If the API to get disk stats exists but a call to it fails, then the status should
4028 # show that no shares will be accepted, and get_available_space() should be 0.
4029 basedir = "storage/WebStatus/status_bad_disk_stats"
4030 fileutil.make_dirs(basedir)
4031 ss = StorageServer(basedir, "\x00" * 20)
4032 ss.setServiceParent(self.s)
4033 w = StorageStatus(ss)
4034 html = w.renderSynchronously()
4035 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4036 s = remove_tags(html)
4037 self.failUnlessIn("Accepting new shares: No", s)
4038 self.failUnlessIn("Total disk space: ?", s)
4039 self.failUnlessIn("Space Available to Tahoe: ?", s)
4040 self.failUnlessEqual(ss.get_available_space(), 0)
4042 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4043 def test_status_right_disk_stats(self, mock_get_disk_stats):
4046 free_for_root = 4*GB
4047 free_for_nonroot = 3*GB
4048 reserved_space = 1*GB
4049 used = total - free_for_root
4050 avail = max(free_for_nonroot - reserved_space, 0)
4051 mock_get_disk_stats.return_value = {
4053 'free_for_root': free_for_root,
4054 'free_for_nonroot': free_for_nonroot,
4059 basedir = "storage/WebStatus/status_right_disk_stats"
4060 fileutil.make_dirs(basedir)
4061 ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4062 expecteddir = ss.sharedir
4063 ss.setServiceParent(self.s)
4064 w = StorageStatus(ss)
4065 html = w.renderSynchronously()
4067 self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4068 mock_get_disk_stats.call_args_list)
4070 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4071 s = remove_tags(html)
4072 self.failUnlessIn("Total disk space: 5.00 GB", s)
4073 self.failUnlessIn("Disk space used: - 1.00 GB", s)
4074 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4075 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4076 self.failUnlessIn("Reserved space: - 1.00 GB", s)
4077 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4078 self.failUnlessEqual(ss.get_available_space(), 2*GB)
4080 def test_readonly(self):
4081 basedir = "storage/WebStatus/readonly"
4082 fileutil.make_dirs(basedir)
4083 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4084 ss.setServiceParent(self.s)
4085 w = StorageStatus(ss)
4086 html = w.renderSynchronously()
4087 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4088 s = remove_tags(html)
4089 self.failUnlessIn("Accepting new shares: No", s)
4091 def test_reserved(self):
4092 basedir = "storage/WebStatus/reserved"
4093 fileutil.make_dirs(basedir)
4094 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4095 ss.setServiceParent(self.s)
4096 w = StorageStatus(ss)
4097 html = w.renderSynchronously()
4098 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4099 s = remove_tags(html)
4100 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4102 def test_huge_reserved(self):
4103 basedir = "storage/WebStatus/reserved"
4104 fileutil.make_dirs(basedir)
4105 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4106 ss.setServiceParent(self.s)
4107 w = StorageStatus(ss)
4108 html = w.renderSynchronously()
4109 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4110 s = remove_tags(html)
4111 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4113 def test_util(self):
4114 w = StorageStatus(None)
4115 self.failUnlessEqual(w.render_space(None, None), "?")
4116 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4117 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4118 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4119 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4120 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)