1 import time, os.path, platform, stat, re, simplejson, struct, shutil
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24 LayoutInvalid, MDMFSIGNABLEHEADER, \
25 SIGNED_PREFIX, MDMFHEADER, \
26 MDMFOFFSETS, SDMFSlotWriteProxy, \
29 VERIFICATION_KEY_SIZE, \
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
40 def __init__(self, ignore_disconnectors=False):
41 self.ignore = ignore_disconnectors
42 self.disconnectors = {}
43 def notifyOnDisconnect(self, f, *args, **kwargs):
47 self.disconnectors[m] = (f, args, kwargs)
49 def dontNotifyOnDisconnect(self, marker):
52 del self.disconnectors[marker]
54 class FakeStatsProvider:
55 def count(self, name, delta=1):
57 def register_producer(self, producer):
60 class Bucket(unittest.TestCase):
61 def make_workdir(self, name):
62 basedir = os.path.join("storage", "Bucket", name)
63 incoming = os.path.join(basedir, "tmp", "bucket")
64 final = os.path.join(basedir, "bucket")
65 fileutil.make_dirs(basedir)
66 fileutil.make_dirs(os.path.join(basedir, "tmp"))
67 return incoming, final
69 def bucket_writer_closed(self, bw, consumed):
71 def add_latency(self, category, latency):
73 def count(self, name, delta=1):
78 renew_secret = os.urandom(32)
79 cancel_secret = os.urandom(32)
80 expiration_time = time.time() + 5000
81 return LeaseInfo(owner_num, renew_secret, cancel_secret,
82 expiration_time, "\x00" * 20)
84 def test_create(self):
85 incoming, final = self.make_workdir("test_create")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*25)
91 bw.remote_write(75, "d"*7)
94 def test_readwrite(self):
95 incoming, final = self.make_workdir("test_readwrite")
96 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
98 bw.remote_write(0, "a"*25)
99 bw.remote_write(25, "b"*25)
100 bw.remote_write(50, "c"*7) # last block may be short
104 br = BucketReader(self, bw.finalhome)
105 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
109 def test_read_past_end_of_share_data(self):
110 # test vector for immutable files (hard-coded contents of an immutable share
113 # The following immutable share file content is identical to that
114 # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115 # with share data == 'a'. The total size of this content is 85
118 containerdata = struct.pack('>LLL', 1, 1, 1)
120 # A Tahoe-LAFS storage client would send as the share_data a
121 # complicated string involving hash trees and a URI Extension Block
122 # -- see allmydata/immutable/layout.py . This test, which is
123 # simulating a client, just sends 'a'.
126 ownernumber = struct.pack('>L', 0)
127 renewsecret = 'THIS LETS ME RENEW YOUR FILE....'
128 assert len(renewsecret) == 32
129 cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130 assert len(cancelsecret) == 32
131 expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
133 lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
135 share_file_data = containerdata + share_data + lease_data
137 incoming, final = self.make_workdir("test_read_past_end_of_share_data")
139 fileutil.write(final, share_file_data)
141 mockstorageserver = mock.Mock()
144 br = BucketReader(mockstorageserver, final)
146 self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
148 # Read past the end of share data to get the cancel secret.
149 read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
151 result_of_read = br.remote_read(0, read_length)
152 self.failUnlessEqual(result_of_read, share_data)
154 result_of_read = br.remote_read(0, len(share_data)+1)
155 self.failUnlessEqual(result_of_read, share_data)
163 def callRemote(self, methname, *args, **kwargs):
165 meth = getattr(self.target, "remote_" + methname)
166 return meth(*args, **kwargs)
168 if methname == "slot_readv":
170 if "writev" in methname:
171 self.write_count += 1
173 return defer.maybeDeferred(_call)
176 class BucketProxy(unittest.TestCase):
177 def make_bucket(self, name, size):
178 basedir = os.path.join("storage", "BucketProxy", name)
179 incoming = os.path.join(basedir, "tmp", "bucket")
180 final = os.path.join(basedir, "bucket")
181 fileutil.make_dirs(basedir)
182 fileutil.make_dirs(os.path.join(basedir, "tmp"))
183 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
189 def make_lease(self):
191 renew_secret = os.urandom(32)
192 cancel_secret = os.urandom(32)
193 expiration_time = time.time() + 5000
194 return LeaseInfo(owner_num, renew_secret, cancel_secret,
195 expiration_time, "\x00" * 20)
197 def bucket_writer_closed(self, bw, consumed):
199 def add_latency(self, category, latency):
201 def count(self, name, delta=1):
204 def test_create(self):
205 bw, rb, sharefname = self.make_bucket("test_create", 500)
206 bp = WriteBucketProxy(rb, None,
211 uri_extension_size_max=500)
212 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
214 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215 # Let's pretend each share has 100 bytes of data, and that there are
216 # 4 segments (25 bytes each), and 8 shares total. So the two
217 # per-segment merkle trees (crypttext_hash_tree,
218 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221 # long. That should make the whole share:
223 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
226 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
228 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
230 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
232 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
234 uri_extension = "s" + "E"*498 + "e"
236 bw, rb, sharefname = self.make_bucket(name, sharesize)
237 bp = wbp_class(rb, None,
242 uri_extension_size_max=len(uri_extension))
245 d.addCallback(lambda res: bp.put_block(0, "a"*25))
246 d.addCallback(lambda res: bp.put_block(1, "b"*25))
247 d.addCallback(lambda res: bp.put_block(2, "c"*25))
248 d.addCallback(lambda res: bp.put_block(3, "d"*20))
249 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253 d.addCallback(lambda res: bp.close())
255 # now read everything back
256 def _start_reading(res):
257 br = BucketReader(self, sharefname)
260 server = NoNetworkServer("abc", None)
261 rbp = rbp_class(rb, server, storage_index="")
262 self.failUnlessIn("to peer", repr(rbp))
263 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
265 d1 = rbp.get_block_data(0, 25, 25)
266 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
274 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275 d1.addCallback(lambda res:
276 self.failUnlessEqual(res, crypttext_hashes))
277 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279 d1.addCallback(lambda res: rbp.get_share_hashes())
280 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281 d1.addCallback(lambda res: rbp.get_uri_extension())
282 d1.addCallback(lambda res:
283 self.failUnlessEqual(res, uri_extension))
287 d.addCallback(_start_reading)
291 def test_readwrite_v1(self):
292 return self._do_test_readwrite("test_readwrite_v1",
293 0x24, WriteBucketProxy, ReadBucketProxy)
295 def test_readwrite_v2(self):
296 return self._do_test_readwrite("test_readwrite_v2",
297 0x44, WriteBucketProxy_v2, ReadBucketProxy)
299 class Server(unittest.TestCase):
302 self.sparent = LoggingServiceParent()
303 self.sparent.startService()
304 self._lease_secret = itertools.count()
306 return self.sparent.stopService()
308 def workdir(self, name):
309 basedir = os.path.join("storage", "Server", name)
312 def create(self, name, reserved_space=0, klass=StorageServer):
313 workdir = self.workdir(name)
314 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315 stats_provider=FakeStatsProvider())
316 ss.setServiceParent(self.sparent)
319 def test_create(self):
320 self.create("test_create")
322 def test_declares_fixed_1528(self):
323 ss = self.create("test_declares_fixed_1528")
324 ver = ss.remote_get_version()
325 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326 self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
328 def test_declares_maximum_share_sizes(self):
329 ss = self.create("test_declares_maximum_share_sizes")
330 ver = ss.remote_get_version()
331 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
332 self.failUnlessIn('maximum-immutable-share-size', sv1)
333 self.failUnlessIn('maximum-mutable-share-size', sv1)
335 def test_declares_available_space(self):
336 ss = self.create("test_declares_available_space")
337 ver = ss.remote_get_version()
338 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
339 self.failUnlessIn('available-space', sv1)
341 def allocate(self, ss, storage_index, sharenums, size, canary=None):
342 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
343 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
345 canary = FakeCanary()
346 return ss.remote_allocate_buckets(storage_index,
347 renew_secret, cancel_secret,
348 sharenums, size, canary)
350 def test_large_share(self):
351 syslow = platform.system().lower()
352 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
353 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
355 avail = fileutil.get_available_space('.', 512*2**20)
357 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
359 ss = self.create("test_large_share")
361 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
362 self.failUnlessEqual(already, set())
363 self.failUnlessEqual(set(writers.keys()), set([0]))
365 shnum, bucket = writers.items()[0]
366 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
367 bucket.remote_write(2**32, "ab")
368 bucket.remote_close()
370 readers = ss.remote_get_buckets("allocate")
371 reader = readers[shnum]
372 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
374 def test_dont_overfill_dirs(self):
376 This test asserts that if you add a second share whose storage index
377 share lots of leading bits with an extant share (but isn't the exact
378 same storage index), this won't add an entry to the share directory.
380 ss = self.create("test_dont_overfill_dirs")
381 already, writers = self.allocate(ss, "storageindex", [0], 10)
382 for i, wb in writers.items():
383 wb.remote_write(0, "%10d" % i)
385 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
387 children_of_storedir = set(os.listdir(storedir))
389 # Now store another one under another storageindex that has leading
390 # chars the same as the first storageindex.
391 already, writers = self.allocate(ss, "storageindey", [0], 10)
392 for i, wb in writers.items():
393 wb.remote_write(0, "%10d" % i)
395 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
397 new_children_of_storedir = set(os.listdir(storedir))
398 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
400 def test_remove_incoming(self):
401 ss = self.create("test_remove_incoming")
402 already, writers = self.allocate(ss, "vid", range(3), 10)
403 for i,wb in writers.items():
404 wb.remote_write(0, "%10d" % i)
406 incoming_share_dir = wb.incominghome
407 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
408 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
409 incoming_dir = os.path.dirname(incoming_prefix_dir)
410 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
411 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
412 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
414 def test_abort(self):
415 # remote_abort, when called on a writer, should make sure that
416 # the allocated size of the bucket is not counted by the storage
417 # server when accounting for space.
418 ss = self.create("test_abort")
419 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
420 self.failIfEqual(ss.allocated_size(), 0)
422 # Now abort the writers.
423 for writer in writers.itervalues():
424 writer.remote_abort()
425 self.failUnlessEqual(ss.allocated_size(), 0)
428 def test_allocate(self):
429 ss = self.create("test_allocate")
431 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
433 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
434 self.failUnlessEqual(already, set())
435 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
437 # while the buckets are open, they should not count as readable
438 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
441 for i,wb in writers.items():
442 wb.remote_write(0, "%25d" % i)
444 # aborting a bucket that was already closed is a no-op
447 # now they should be readable
448 b = ss.remote_get_buckets("allocate")
449 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
450 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
452 self.failUnlessIn("BucketReader", b_str)
453 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
455 # now if we ask about writing again, the server should offer those
456 # three buckets as already present. It should offer them even if we
457 # don't ask about those specific ones.
458 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
459 self.failUnlessEqual(already, set([0,1,2]))
460 self.failUnlessEqual(set(writers.keys()), set([3,4]))
462 # while those two buckets are open for writing, the server should
463 # refuse to offer them to uploaders
465 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
466 self.failUnlessEqual(already2, set([0,1,2]))
467 self.failUnlessEqual(set(writers2.keys()), set([5]))
469 # aborting the writes should remove the tempfiles
470 for i,wb in writers2.items():
472 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
473 self.failUnlessEqual(already2, set([0,1,2]))
474 self.failUnlessEqual(set(writers2.keys()), set([5]))
476 for i,wb in writers2.items():
478 for i,wb in writers.items():
481 def test_bad_container_version(self):
482 ss = self.create("test_bad_container_version")
483 a,w = self.allocate(ss, "si1", [0], 10)
484 w[0].remote_write(0, "\xff"*10)
487 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
490 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
493 ss.remote_get_buckets("allocate")
495 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
496 ss.remote_get_buckets, "si1")
497 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
499 def test_disconnect(self):
500 # simulate a disconnection
501 ss = self.create("test_disconnect")
502 canary = FakeCanary()
503 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
504 self.failUnlessEqual(already, set())
505 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
506 for (f,args,kwargs) in canary.disconnectors.values():
511 # that ought to delete the incoming shares
512 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
513 self.failUnlessEqual(already, set())
514 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
516 @mock.patch('allmydata.util.fileutil.get_disk_stats')
517 def test_reserved_space(self, mock_get_disk_stats):
519 mock_get_disk_stats.return_value = {
520 'free_for_nonroot': 15000,
521 'avail': max(15000 - reserved_space, 0),
524 ss = self.create("test_reserved_space", reserved_space=reserved_space)
525 # 15k available, 10k reserved, leaves 5k for shares
527 # a newly created and filled share incurs this much overhead, beyond
528 # the size we request.
530 LEASE_SIZE = 4+32+32+4
531 canary = FakeCanary(True)
532 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
533 self.failUnlessEqual(len(writers), 3)
534 # now the StorageServer should have 3000 bytes provisionally
535 # allocated, allowing only 2000 more to be claimed
536 self.failUnlessEqual(len(ss._active_writers), 3)
538 # allocating 1001-byte shares only leaves room for one
539 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
540 self.failUnlessEqual(len(writers2), 1)
541 self.failUnlessEqual(len(ss._active_writers), 4)
543 # we abandon the first set, so their provisional allocation should be
547 self.failUnlessEqual(len(ss._active_writers), 1)
548 # now we have a provisional allocation of 1001 bytes
550 # and we close the second set, so their provisional allocation should
551 # become real, long-term allocation, and grows to include the
553 for bw in writers2.values():
554 bw.remote_write(0, "a"*25)
559 self.failUnlessEqual(len(ss._active_writers), 0)
561 allocated = 1001 + OVERHEAD + LEASE_SIZE
563 # we have to manually increase available, since we're not doing real
565 mock_get_disk_stats.return_value = {
566 'free_for_nonroot': 15000 - allocated,
567 'avail': max(15000 - allocated - reserved_space, 0),
570 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
571 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
572 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
573 self.failUnlessEqual(len(writers3), 39)
574 self.failUnlessEqual(len(ss._active_writers), 39)
578 self.failUnlessEqual(len(ss._active_writers), 0)
579 ss.disownServiceParent()
583 basedir = self.workdir("test_seek_behavior")
584 fileutil.make_dirs(basedir)
585 filename = os.path.join(basedir, "testfile")
586 f = open(filename, "wb")
589 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
590 # files. mode="a" preserves previous contents but does not allow
591 # seeking-to-create-holes. mode="r+" allows both.
592 f = open(filename, "rb+")
596 filelen = os.stat(filename)[stat.ST_SIZE]
597 self.failUnlessEqual(filelen, 100+3)
598 f2 = open(filename, "rb")
599 self.failUnlessEqual(f2.read(5), "start")
602 def test_leases(self):
603 ss = self.create("test_leases")
604 canary = FakeCanary()
608 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
611 sharenums, size, canary)
612 self.failUnlessEqual(len(already), 0)
613 self.failUnlessEqual(len(writers), 5)
614 for wb in writers.values():
617 leases = list(ss.get_leases("si0"))
618 self.failUnlessEqual(len(leases), 1)
619 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
621 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
622 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
623 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
624 sharenums, size, canary)
625 for wb in writers.values():
628 # take out a second lease on si1
629 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
630 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
631 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
632 sharenums, size, canary)
633 self.failUnlessEqual(len(already), 5)
634 self.failUnlessEqual(len(writers), 0)
636 leases = list(ss.get_leases("si1"))
637 self.failUnlessEqual(len(leases), 2)
638 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
640 # and a third lease, using add-lease
641 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
642 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
643 ss.remote_add_lease("si1", rs2a, cs2a)
644 leases = list(ss.get_leases("si1"))
645 self.failUnlessEqual(len(leases), 3)
646 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
648 # add-lease on a missing storage index is silently ignored
649 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
651 # check that si0 is readable
652 readers = ss.remote_get_buckets("si0")
653 self.failUnlessEqual(len(readers), 5)
655 # renew the first lease. Only the proper renew_secret should work
656 ss.remote_renew_lease("si0", rs0)
657 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
658 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
660 # check that si0 is still readable
661 readers = ss.remote_get_buckets("si0")
662 self.failUnlessEqual(len(readers), 5)
664 # There is no such method as remote_cancel_lease for now -- see
666 self.failIf(hasattr(ss, 'remote_cancel_lease'), \
667 "ss should not have a 'remote_cancel_lease' method/attribute")
669 # test overlapping uploads
670 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
671 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
672 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
673 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
674 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
675 sharenums, size, canary)
676 self.failUnlessEqual(len(already), 0)
677 self.failUnlessEqual(len(writers), 5)
678 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
679 sharenums, size, canary)
680 self.failUnlessEqual(len(already2), 0)
681 self.failUnlessEqual(len(writers2), 0)
682 for wb in writers.values():
685 leases = list(ss.get_leases("si3"))
686 self.failUnlessEqual(len(leases), 1)
688 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
689 sharenums, size, canary)
690 self.failUnlessEqual(len(already3), 5)
691 self.failUnlessEqual(len(writers3), 0)
693 leases = list(ss.get_leases("si3"))
694 self.failUnlessEqual(len(leases), 2)
696 def test_readonly(self):
697 workdir = self.workdir("test_readonly")
698 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
699 ss.setServiceParent(self.sparent)
701 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
702 self.failUnlessEqual(already, set())
703 self.failUnlessEqual(writers, {})
705 stats = ss.get_stats()
706 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
707 if "storage_server.disk_avail" in stats:
708 # Some platforms may not have an API to get disk stats.
709 # But if there are stats, readonly_storage means disk_avail=0
710 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
712 def test_discard(self):
713 # discard is really only used for other tests, but we test it anyways
714 workdir = self.workdir("test_discard")
715 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
716 ss.setServiceParent(self.sparent)
718 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
719 self.failUnlessEqual(already, set())
720 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
721 for i,wb in writers.items():
722 wb.remote_write(0, "%25d" % i)
724 # since we discard the data, the shares should be present but sparse.
725 # Since we write with some seeks, the data we read back will be all
727 b = ss.remote_get_buckets("vid")
728 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
729 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
731 def test_advise_corruption(self):
732 workdir = self.workdir("test_advise_corruption")
733 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
734 ss.setServiceParent(self.sparent)
736 si0_s = base32.b2a("si0")
737 ss.remote_advise_corrupt_share("immutable", "si0", 0,
738 "This share smells funny.\n")
739 reportdir = os.path.join(workdir, "corruption-advisories")
740 reports = os.listdir(reportdir)
741 self.failUnlessEqual(len(reports), 1)
742 report_si0 = reports[0]
743 self.failUnlessIn(si0_s, report_si0)
744 f = open(os.path.join(reportdir, report_si0), "r")
747 self.failUnlessIn("type: immutable", report)
748 self.failUnlessIn("storage_index: %s" % si0_s, report)
749 self.failUnlessIn("share_number: 0", report)
750 self.failUnlessIn("This share smells funny.", report)
752 # test the RIBucketWriter version too
753 si1_s = base32.b2a("si1")
754 already,writers = self.allocate(ss, "si1", [1], 75)
755 self.failUnlessEqual(already, set())
756 self.failUnlessEqual(set(writers.keys()), set([1]))
757 writers[1].remote_write(0, "data")
758 writers[1].remote_close()
760 b = ss.remote_get_buckets("si1")
761 self.failUnlessEqual(set(b.keys()), set([1]))
762 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
764 reports = os.listdir(reportdir)
765 self.failUnlessEqual(len(reports), 2)
766 report_si1 = [r for r in reports if si1_s in r][0]
767 f = open(os.path.join(reportdir, report_si1), "r")
770 self.failUnlessIn("type: immutable", report)
771 self.failUnlessIn("storage_index: %s" % si1_s, report)
772 self.failUnlessIn("share_number: 1", report)
773 self.failUnlessIn("This share tastes like dust.", report)
777 class MutableServer(unittest.TestCase):
780 self.sparent = LoggingServiceParent()
781 self._lease_secret = itertools.count()
783 return self.sparent.stopService()
785 def workdir(self, name):
786 basedir = os.path.join("storage", "MutableServer", name)
789 def create(self, name):
790 workdir = self.workdir(name)
791 ss = StorageServer(workdir, "\x00" * 20)
792 ss.setServiceParent(self.sparent)
795 def test_create(self):
796 self.create("test_create")
798 def write_enabler(self, we_tag):
799 return hashutil.tagged_hash("we_blah", we_tag)
801 def renew_secret(self, tag):
802 return hashutil.tagged_hash("renew_blah", str(tag))
804 def cancel_secret(self, tag):
805 return hashutil.tagged_hash("cancel_blah", str(tag))
807 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
808 write_enabler = self.write_enabler(we_tag)
809 renew_secret = self.renew_secret(lease_tag)
810 cancel_secret = self.cancel_secret(lease_tag)
811 rstaraw = ss.remote_slot_testv_and_readv_and_writev
812 testandwritev = dict( [ (shnum, ([], [], None) )
813 for shnum in sharenums ] )
815 rc = rstaraw(storage_index,
816 (write_enabler, renew_secret, cancel_secret),
819 (did_write, readv_data) = rc
820 self.failUnless(did_write)
821 self.failUnless(isinstance(readv_data, dict))
822 self.failUnlessEqual(len(readv_data), 0)
824 def test_bad_magic(self):
825 ss = self.create("test_bad_magic")
826 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
827 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
832 read = ss.remote_slot_readv
833 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
834 read, "si1", [0], [(0,10)])
835 self.failUnlessIn(" had magic ", str(e))
836 self.failUnlessIn(" but we wanted ", str(e))
838 def test_container_size(self):
839 ss = self.create("test_container_size")
840 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
842 read = ss.remote_slot_readv
843 rstaraw = ss.remote_slot_testv_and_readv_and_writev
844 secrets = ( self.write_enabler("we1"),
845 self.renew_secret("we1"),
846 self.cancel_secret("we1") )
847 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
848 answer = rstaraw("si1", secrets,
849 {0: ([], [(0,data)], len(data)+12)},
851 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
853 # Trying to make the container too large (by sending a write vector
854 # whose offset is too high) will raise an exception.
855 TOOBIG = MutableShareFile.MAX_SIZE + 10
856 self.failUnlessRaises(DataTooLargeError,
857 rstaraw, "si1", secrets,
858 {0: ([], [(TOOBIG,data)], None)},
861 answer = rstaraw("si1", secrets,
862 {0: ([], [(0,data)], None)},
864 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
866 read_answer = read("si1", [0], [(0,10)])
867 self.failUnlessEqual(read_answer, {0: [data[:10]]})
869 # Sending a new_length shorter than the current length truncates the
871 answer = rstaraw("si1", secrets,
874 read_answer = read("si1", [0], [(0,10)])
875 self.failUnlessEqual(read_answer, {0: [data[:9]]})
877 # Sending a new_length longer than the current length doesn't change
879 answer = rstaraw("si1", secrets,
882 assert answer == (True, {0:[],1:[],2:[]})
883 read_answer = read("si1", [0], [(0, 20)])
884 self.failUnlessEqual(read_answer, {0: [data[:9]]})
886 # Sending a write vector whose start is after the end of the current
887 # data doesn't reveal "whatever was there last time" (palimpsest),
888 # but instead fills with zeroes.
890 # To test this, we fill the data area with a recognizable pattern.
891 pattern = ''.join([chr(i) for i in range(100)])
892 answer = rstaraw("si1", secrets,
893 {0: ([], [(0, pattern)], None)},
895 assert answer == (True, {0:[],1:[],2:[]})
896 # Then truncate the data...
897 answer = rstaraw("si1", secrets,
900 assert answer == (True, {0:[],1:[],2:[]})
901 # Just confirm that you get an empty string if you try to read from
902 # past the (new) endpoint now.
903 answer = rstaraw("si1", secrets,
906 self.failUnlessEqual(answer, (True, {0:[''],1:[''],2:['']}))
908 # Then the extend the file by writing a vector which starts out past
910 answer = rstaraw("si1", secrets,
911 {0: ([], [(50, 'hellothere')], None)},
913 assert answer == (True, {0:[],1:[],2:[]})
914 # Now if you read the stuff between 20 (where we earlier truncated)
915 # and 50, it had better be all zeroes.
916 answer = rstaraw("si1", secrets,
919 self.failUnlessEqual(answer, (True, {0:['\x00'*30],1:[''],2:['']}))
921 # Also see if the server explicitly declares that it supports this
923 ver = ss.remote_get_version()
924 storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
925 self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
927 # If the size is dropped to zero the share is deleted.
928 answer = rstaraw("si1", secrets,
929 {0: ([], [(0,data)], 0)},
931 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
933 read_answer = read("si1", [0], [(0,10)])
934 self.failUnlessEqual(read_answer, {})
936 def test_allocate(self):
937 ss = self.create("test_allocate")
938 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
941 read = ss.remote_slot_readv
942 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
944 self.failUnlessEqual(read("si1", [], [(0, 10)]),
945 {0: [""], 1: [""], 2: [""]})
946 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
950 secrets = ( self.write_enabler("we1"),
951 self.renew_secret("we1"),
952 self.cancel_secret("we1") )
953 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
954 write = ss.remote_slot_testv_and_readv_and_writev
955 answer = write("si1", secrets,
956 {0: ([], [(0,data)], None)},
958 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
960 self.failUnlessEqual(read("si1", [0], [(0,20)]),
961 {0: ["00000000001111111111"]})
962 self.failUnlessEqual(read("si1", [0], [(95,10)]),
964 #self.failUnlessEqual(s0.remote_get_length(), 100)
966 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
967 f = self.failUnlessRaises(BadWriteEnablerError,
968 write, "si1", bad_secrets,
970 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
972 # this testv should fail
973 answer = write("si1", secrets,
974 {0: ([(0, 12, "eq", "444444444444"),
975 (20, 5, "eq", "22222"),
982 self.failUnlessEqual(answer, (False,
983 {0: ["000000000011", "22222"],
987 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
990 answer = write("si1", secrets,
991 {0: ([(10, 5, "lt", "11111"),
998 self.failUnlessEqual(answer, (False,
1003 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1006 def test_operators(self):
1007 # test operators, the data we're comparing is '11111' in all cases.
1008 # test both fail+pass, reset data after each one.
1009 ss = self.create("test_operators")
1011 secrets = ( self.write_enabler("we1"),
1012 self.renew_secret("we1"),
1013 self.cancel_secret("we1") )
1014 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1015 write = ss.remote_slot_testv_and_readv_and_writev
1016 read = ss.remote_slot_readv
1019 write("si1", secrets,
1020 {0: ([], [(0,data)], None)},
1026 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
1031 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1032 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1033 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
1036 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
1041 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1042 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1045 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1050 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1051 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1055 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1060 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1061 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1064 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1069 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1070 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1073 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1078 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1079 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1083 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1088 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1089 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1092 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1097 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1098 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1102 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1107 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1108 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1111 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1116 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1117 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1121 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1126 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1127 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1130 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1135 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1136 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1139 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1144 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1145 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1149 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1154 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1155 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1158 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1163 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1164 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1167 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1172 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1173 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1176 # finally, test some operators against empty shares
1177 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1182 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1183 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1186 def test_readv(self):
1187 ss = self.create("test_readv")
1188 secrets = ( self.write_enabler("we1"),
1189 self.renew_secret("we1"),
1190 self.cancel_secret("we1") )
1191 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1192 write = ss.remote_slot_testv_and_readv_and_writev
1193 read = ss.remote_slot_readv
1194 data = [("%d" % i) * 100 for i in range(3)]
1195 rc = write("si1", secrets,
1196 {0: ([], [(0,data[0])], None),
1197 1: ([], [(0,data[1])], None),
1198 2: ([], [(0,data[2])], None),
1200 self.failUnlessEqual(rc, (True, {}))
1202 answer = read("si1", [], [(0, 10)])
1203 self.failUnlessEqual(answer, {0: ["0"*10],
1207 def compare_leases_without_timestamps(self, leases_a, leases_b):
1208 self.failUnlessEqual(len(leases_a), len(leases_b))
1209 for i in range(len(leases_a)):
1212 self.failUnlessEqual(a.owner_num, b.owner_num)
1213 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1214 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1215 self.failUnlessEqual(a.nodeid, b.nodeid)
1217 def compare_leases(self, leases_a, leases_b):
1218 self.failUnlessEqual(len(leases_a), len(leases_b))
1219 for i in range(len(leases_a)):
1222 self.failUnlessEqual(a.owner_num, b.owner_num)
1223 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1224 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1225 self.failUnlessEqual(a.nodeid, b.nodeid)
1226 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1228 def test_leases(self):
1229 ss = self.create("test_leases")
1231 return ( self.write_enabler("we1"),
1232 self.renew_secret("we1-%d" % n),
1233 self.cancel_secret("we1-%d" % n) )
1234 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1235 write = ss.remote_slot_testv_and_readv_and_writev
1236 read = ss.remote_slot_readv
1237 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1238 self.failUnlessEqual(rc, (True, {}))
1240 # create a random non-numeric file in the bucket directory, to
1241 # exercise the code that's supposed to ignore those.
1242 bucket_dir = os.path.join(self.workdir("test_leases"),
1243 "shares", storage_index_to_dir("si1"))
1244 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1245 f.write("you ought to be ignoring me\n")
1248 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1249 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1251 # add-lease on a missing storage index is silently ignored
1252 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1254 # re-allocate the slots and use the same secrets, that should update
1256 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1257 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1260 ss.remote_renew_lease("si1", secrets(0)[1])
1261 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1263 # now allocate them with a bunch of different secrets, to trigger the
1264 # extended lease code. Use add_lease for one of them.
1265 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1266 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1267 secrets2 = secrets(2)
1268 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1269 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1270 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1271 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1272 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1274 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1276 all_leases = list(s0.get_leases())
1277 # and write enough data to expand the container, forcing the server
1278 # to move the leases
1279 write("si1", secrets(0),
1280 {0: ([], [(0,data)], 200), },
1283 # read back the leases, make sure they're still intact.
1284 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1286 ss.remote_renew_lease("si1", secrets(0)[1])
1287 ss.remote_renew_lease("si1", secrets(1)[1])
1288 ss.remote_renew_lease("si1", secrets(2)[1])
1289 ss.remote_renew_lease("si1", secrets(3)[1])
1290 ss.remote_renew_lease("si1", secrets(4)[1])
1291 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1292 # get a new copy of the leases, with the current timestamps. Reading
1293 # data and failing to renew/cancel leases should leave the timestamps
1295 all_leases = list(s0.get_leases())
1296 # renewing with a bogus token should prompt an error message
1298 # examine the exception thus raised, make sure the old nodeid is
1299 # present, to provide for share migration
1300 e = self.failUnlessRaises(IndexError,
1301 ss.remote_renew_lease, "si1",
1304 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1305 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1306 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1308 self.compare_leases(all_leases, list(s0.get_leases()))
1310 # reading shares should not modify the timestamp
1311 read("si1", [], [(0,200)])
1312 self.compare_leases(all_leases, list(s0.get_leases()))
1314 write("si1", secrets(0),
1315 {0: ([], [(200, "make me bigger")], None)}, [])
1316 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1318 write("si1", secrets(0),
1319 {0: ([], [(500, "make me really bigger")], None)}, [])
1320 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1322 def test_remove(self):
1323 ss = self.create("test_remove")
1324 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1326 readv = ss.remote_slot_readv
1327 writev = ss.remote_slot_testv_and_readv_and_writev
1328 secrets = ( self.write_enabler("we1"),
1329 self.renew_secret("we1"),
1330 self.cancel_secret("we1") )
1331 # delete sh0 by setting its size to zero
1332 answer = writev("si1", secrets,
1335 # the answer should mention all the shares that existed before the
1337 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1338 # but a new read should show only sh1 and sh2
1339 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1342 # delete sh1 by setting its size to zero
1343 answer = writev("si1", secrets,
1346 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1347 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1350 # delete sh2 by setting its size to zero
1351 answer = writev("si1", secrets,
1354 self.failUnlessEqual(answer, (True, {2:[]}) )
1355 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1357 # and the bucket directory should now be gone
1358 si = base32.b2a("si1")
1359 # note: this is a detail of the storage server implementation, and
1360 # may change in the future
1362 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1363 bucketdir = os.path.join(prefixdir, si)
1364 self.failUnless(os.path.exists(prefixdir), prefixdir)
1365 self.failIf(os.path.exists(bucketdir), bucketdir)
1368 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1370 self.sparent = LoggingServiceParent()
1371 self._lease_secret = itertools.count()
1372 self.ss = self.create("MDMFProxies storage test server")
1373 self.rref = RemoteBucket()
1374 self.rref.target = self.ss
1375 self.secrets = (self.write_enabler("we_secret"),
1376 self.renew_secret("renew_secret"),
1377 self.cancel_secret("cancel_secret"))
1378 self.segment = "aaaaaa"
1380 self.salt = "a" * 16
1381 self.block_hash = "a" * 32
1382 self.block_hash_tree = [self.block_hash for i in xrange(6)]
1383 self.share_hash = self.block_hash
1384 self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1385 self.signature = "foobarbaz"
1386 self.verification_key = "vvvvvv"
1387 self.encprivkey = "private"
1388 self.root_hash = self.block_hash
1389 self.salt_hash = self.root_hash
1390 self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1391 self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1392 self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1393 # blockhashes and salt hashes are serialized in the same way,
1394 # only we lop off the first element and store that in the
1396 self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1400 self.sparent.stopService()
1401 shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1404 def write_enabler(self, we_tag):
1405 return hashutil.tagged_hash("we_blah", we_tag)
1408 def renew_secret(self, tag):
1409 return hashutil.tagged_hash("renew_blah", str(tag))
1412 def cancel_secret(self, tag):
1413 return hashutil.tagged_hash("cancel_blah", str(tag))
1416 def workdir(self, name):
1417 basedir = os.path.join("storage", "MutableServer", name)
1421 def create(self, name):
1422 workdir = self.workdir(name)
1423 ss = StorageServer(workdir, "\x00" * 20)
1424 ss.setServiceParent(self.sparent)
1428 def build_test_mdmf_share(self, tail_segment=False, empty=False):
1429 # Start with the checkstring
1430 data = struct.pack(">BQ32s",
1434 self.checkstring = data
1435 # Next, the encoding parameters
1437 data += struct.pack(">BBQQ",
1443 data += struct.pack(">BBQQ",
1449 data += struct.pack(">BBQQ",
1454 # Now we'll build the offsets.
1456 if not tail_segment and not empty:
1458 sharedata += self.salt + self.block
1461 sharedata += self.salt + self.block
1462 sharedata += self.salt + "a"
1464 # The encrypted private key comes after the shares + salts
1465 offset_size = struct.calcsize(MDMFOFFSETS)
1466 encrypted_private_key_offset = len(data) + offset_size
1467 # The share has chain comes after the private key
1468 sharehashes_offset = encrypted_private_key_offset + \
1469 len(self.encprivkey)
1471 # The signature comes after the share hash chain.
1472 signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1474 verification_key_offset = signature_offset + len(self.signature)
1475 verification_key_end = verification_key_offset + \
1476 len(self.verification_key)
1478 share_data_offset = offset_size
1479 share_data_offset += PRIVATE_KEY_SIZE
1480 share_data_offset += SIGNATURE_SIZE
1481 share_data_offset += VERIFICATION_KEY_SIZE
1482 share_data_offset += SHARE_HASH_CHAIN_SIZE
1484 blockhashes_offset = share_data_offset + len(sharedata)
1485 eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1487 data += struct.pack(MDMFOFFSETS,
1488 encrypted_private_key_offset,
1491 verification_key_offset,
1492 verification_key_end,
1498 self.offsets['enc_privkey'] = encrypted_private_key_offset
1499 self.offsets['block_hash_tree'] = blockhashes_offset
1500 self.offsets['share_hash_chain'] = sharehashes_offset
1501 self.offsets['signature'] = signature_offset
1502 self.offsets['verification_key'] = verification_key_offset
1503 self.offsets['share_data'] = share_data_offset
1504 self.offsets['verification_key_end'] = verification_key_end
1505 self.offsets['EOF'] = eof_offset
1508 data += self.encprivkey
1510 data += self.share_hash_chain_s
1512 data += self.signature
1513 # and the verification key
1514 data += self.verification_key
1515 # Then we'll add in gibberish until we get to the right point.
1516 nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1519 # Then the share data
1522 data += self.block_hash_tree_s
1526 def write_test_share_to_server(self,
1531 I write some data for the read tests to read to self.ss
1533 If tail_segment=True, then I will write a share that has a
1534 smaller tail segment than other segments.
1536 write = self.ss.remote_slot_testv_and_readv_and_writev
1537 data = self.build_test_mdmf_share(tail_segment, empty)
1538 # Finally, we write the whole thing to the storage server in one
1540 testvs = [(0, 1, "eq", "")]
1542 tws[0] = (testvs, [(0, data)], None)
1544 results = write(storage_index, self.secrets, tws, readv)
1545 self.failUnless(results[0])
1548 def build_test_sdmf_share(self, empty=False):
1552 sharedata = self.segment * 6
1553 self.sharedata = sharedata
1554 blocksize = len(sharedata) / 3
1555 block = sharedata[:blocksize]
1556 self.blockdata = block
1557 prefix = struct.pack(">BQ32s16s BBQQ",
1567 post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1568 signature_offset = post_offset + len(self.verification_key)
1569 sharehashes_offset = signature_offset + len(self.signature)
1570 blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1571 sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1572 encprivkey_offset = sharedata_offset + len(block)
1573 eof_offset = encprivkey_offset + len(self.encprivkey)
1574 offsets = struct.pack(">LLLLQQ",
1581 final_share = "".join([prefix,
1583 self.verification_key,
1585 self.share_hash_chain_s,
1586 self.block_hash_tree_s,
1590 self.offsets['signature'] = signature_offset
1591 self.offsets['share_hash_chain'] = sharehashes_offset
1592 self.offsets['block_hash_tree'] = blockhashes_offset
1593 self.offsets['share_data'] = sharedata_offset
1594 self.offsets['enc_privkey'] = encprivkey_offset
1595 self.offsets['EOF'] = eof_offset
1599 def write_sdmf_share_to_server(self,
1602 # Some tests need SDMF shares to verify that we can still
1603 # read them. This method writes one, which resembles but is not
1605 write = self.ss.remote_slot_testv_and_readv_and_writev
1606 share = self.build_test_sdmf_share(empty)
1607 testvs = [(0, 1, "eq", "")]
1609 tws[0] = (testvs, [(0, share)], None)
1611 results = write(storage_index, self.secrets, tws, readv)
1612 self.failUnless(results[0])
1615 def test_read(self):
1616 self.write_test_share_to_server("si1")
1617 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1618 # Check that every method equals what we expect it to.
1619 d = defer.succeed(None)
1620 def _check_block_and_salt((block, salt)):
1621 self.failUnlessEqual(block, self.block)
1622 self.failUnlessEqual(salt, self.salt)
1625 d.addCallback(lambda ignored, i=i:
1626 mr.get_block_and_salt(i))
1627 d.addCallback(_check_block_and_salt)
1629 d.addCallback(lambda ignored:
1630 mr.get_encprivkey())
1631 d.addCallback(lambda encprivkey:
1632 self.failUnlessEqual(self.encprivkey, encprivkey))
1634 d.addCallback(lambda ignored:
1635 mr.get_blockhashes())
1636 d.addCallback(lambda blockhashes:
1637 self.failUnlessEqual(self.block_hash_tree, blockhashes))
1639 d.addCallback(lambda ignored:
1640 mr.get_sharehashes())
1641 d.addCallback(lambda sharehashes:
1642 self.failUnlessEqual(self.share_hash_chain, sharehashes))
1644 d.addCallback(lambda ignored:
1646 d.addCallback(lambda signature:
1647 self.failUnlessEqual(signature, self.signature))
1649 d.addCallback(lambda ignored:
1650 mr.get_verification_key())
1651 d.addCallback(lambda verification_key:
1652 self.failUnlessEqual(verification_key, self.verification_key))
1654 d.addCallback(lambda ignored:
1656 d.addCallback(lambda seqnum:
1657 self.failUnlessEqual(seqnum, 0))
1659 d.addCallback(lambda ignored:
1661 d.addCallback(lambda root_hash:
1662 self.failUnlessEqual(self.root_hash, root_hash))
1664 d.addCallback(lambda ignored:
1666 d.addCallback(lambda seqnum:
1667 self.failUnlessEqual(0, seqnum))
1669 d.addCallback(lambda ignored:
1670 mr.get_encoding_parameters())
1671 def _check_encoding_parameters((k, n, segsize, datalen)):
1672 self.failUnlessEqual(k, 3)
1673 self.failUnlessEqual(n, 10)
1674 self.failUnlessEqual(segsize, 6)
1675 self.failUnlessEqual(datalen, 36)
1676 d.addCallback(_check_encoding_parameters)
1678 d.addCallback(lambda ignored:
1679 mr.get_checkstring())
1680 d.addCallback(lambda checkstring:
1681 self.failUnlessEqual(checkstring, checkstring))
1685 def test_read_with_different_tail_segment_size(self):
1686 self.write_test_share_to_server("si1", tail_segment=True)
1687 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1688 d = mr.get_block_and_salt(5)
1689 def _check_tail_segment(results):
1690 block, salt = results
1691 self.failUnlessEqual(len(block), 1)
1692 self.failUnlessEqual(block, "a")
1693 d.addCallback(_check_tail_segment)
1697 def test_get_block_with_invalid_segnum(self):
1698 self.write_test_share_to_server("si1")
1699 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1700 d = defer.succeed(None)
1701 d.addCallback(lambda ignored:
1702 self.shouldFail(LayoutInvalid, "test invalid segnum",
1704 mr.get_block_and_salt, 7))
1708 def test_get_encoding_parameters_first(self):
1709 self.write_test_share_to_server("si1")
1710 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1711 d = mr.get_encoding_parameters()
1712 def _check_encoding_parameters((k, n, segment_size, datalen)):
1713 self.failUnlessEqual(k, 3)
1714 self.failUnlessEqual(n, 10)
1715 self.failUnlessEqual(segment_size, 6)
1716 self.failUnlessEqual(datalen, 36)
1717 d.addCallback(_check_encoding_parameters)
1721 def test_get_seqnum_first(self):
1722 self.write_test_share_to_server("si1")
1723 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1725 d.addCallback(lambda seqnum:
1726 self.failUnlessEqual(seqnum, 0))
1730 def test_get_root_hash_first(self):
1731 self.write_test_share_to_server("si1")
1732 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1733 d = mr.get_root_hash()
1734 d.addCallback(lambda root_hash:
1735 self.failUnlessEqual(root_hash, self.root_hash))
1739 def test_get_checkstring_first(self):
1740 self.write_test_share_to_server("si1")
1741 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1742 d = mr.get_checkstring()
1743 d.addCallback(lambda checkstring:
1744 self.failUnlessEqual(checkstring, self.checkstring))
1748 def test_write_read_vectors(self):
1749 # When writing for us, the storage server will return to us a
1750 # read vector, along with its result. If a write fails because
1751 # the test vectors failed, this read vector can help us to
1752 # diagnose the problem. This test ensures that the read vector
1753 # is working appropriately.
1754 mw = self._make_new_mw("si1", 0)
1757 mw.put_block(self.block, i, self.salt)
1758 mw.put_encprivkey(self.encprivkey)
1759 mw.put_blockhashes(self.block_hash_tree)
1760 mw.put_sharehashes(self.share_hash_chain)
1761 mw.put_root_hash(self.root_hash)
1762 mw.put_signature(self.signature)
1763 mw.put_verification_key(self.verification_key)
1764 d = mw.finish_publishing()
1766 self.failUnless(len(results), 2)
1767 result, readv = results
1768 self.failUnless(result)
1770 self.old_checkstring = mw.get_checkstring()
1771 mw.set_checkstring("")
1772 d.addCallback(_then)
1773 d.addCallback(lambda ignored:
1774 mw.finish_publishing())
1775 def _then_again(results):
1776 self.failUnlessEqual(len(results), 2)
1777 result, readvs = results
1779 self.failUnlessIn(0, readvs)
1780 readv = readvs[0][0]
1781 self.failUnlessEqual(readv, self.old_checkstring)
1782 d.addCallback(_then_again)
1783 # The checkstring remains the same for the rest of the process.
1787 def test_private_key_after_share_hash_chain(self):
1788 mw = self._make_new_mw("si1", 0)
1789 d = defer.succeed(None)
1791 d.addCallback(lambda ignored, i=i:
1792 mw.put_block(self.block, i, self.salt))
1793 d.addCallback(lambda ignored:
1794 mw.put_encprivkey(self.encprivkey))
1795 d.addCallback(lambda ignored:
1796 mw.put_sharehashes(self.share_hash_chain))
1798 # Now try to put the private key again.
1799 d.addCallback(lambda ignored:
1800 self.shouldFail(LayoutInvalid, "test repeat private key",
1802 mw.put_encprivkey, self.encprivkey))
1806 def test_signature_after_verification_key(self):
1807 mw = self._make_new_mw("si1", 0)
1808 d = defer.succeed(None)
1809 # Put everything up to and including the verification key.
1811 d.addCallback(lambda ignored, i=i:
1812 mw.put_block(self.block, i, self.salt))
1813 d.addCallback(lambda ignored:
1814 mw.put_encprivkey(self.encprivkey))
1815 d.addCallback(lambda ignored:
1816 mw.put_blockhashes(self.block_hash_tree))
1817 d.addCallback(lambda ignored:
1818 mw.put_sharehashes(self.share_hash_chain))
1819 d.addCallback(lambda ignored:
1820 mw.put_root_hash(self.root_hash))
1821 d.addCallback(lambda ignored:
1822 mw.put_signature(self.signature))
1823 d.addCallback(lambda ignored:
1824 mw.put_verification_key(self.verification_key))
1825 # Now try to put the signature again. This should fail
1826 d.addCallback(lambda ignored:
1827 self.shouldFail(LayoutInvalid, "signature after verification",
1829 mw.put_signature, self.signature))
1833 def test_uncoordinated_write(self):
1834 # Make two mutable writers, both pointing to the same storage
1835 # server, both at the same storage index, and try writing to the
1837 mw1 = self._make_new_mw("si1", 0)
1838 mw2 = self._make_new_mw("si1", 0)
1840 def _check_success(results):
1841 result, readvs = results
1842 self.failUnless(result)
1844 def _check_failure(results):
1845 result, readvs = results
1848 def _write_share(mw):
1850 mw.put_block(self.block, i, self.salt)
1851 mw.put_encprivkey(self.encprivkey)
1852 mw.put_blockhashes(self.block_hash_tree)
1853 mw.put_sharehashes(self.share_hash_chain)
1854 mw.put_root_hash(self.root_hash)
1855 mw.put_signature(self.signature)
1856 mw.put_verification_key(self.verification_key)
1857 return mw.finish_publishing()
1858 d = _write_share(mw1)
1859 d.addCallback(_check_success)
1860 d.addCallback(lambda ignored:
1862 d.addCallback(_check_failure)
1866 def test_invalid_salt_size(self):
1867 # Salts need to be 16 bytes in size. Writes that attempt to
1868 # write more or less than this should be rejected.
1869 mw = self._make_new_mw("si1", 0)
1870 invalid_salt = "a" * 17 # 17 bytes
1871 another_invalid_salt = "b" * 15 # 15 bytes
1872 d = defer.succeed(None)
1873 d.addCallback(lambda ignored:
1874 self.shouldFail(LayoutInvalid, "salt too big",
1876 mw.put_block, self.block, 0, invalid_salt))
1877 d.addCallback(lambda ignored:
1878 self.shouldFail(LayoutInvalid, "salt too small",
1880 mw.put_block, self.block, 0,
1881 another_invalid_salt))
1885 def test_write_test_vectors(self):
1886 # If we give the write proxy a bogus test vector at
1887 # any point during the process, it should fail to write when we
1889 def _check_failure(results):
1890 self.failUnlessEqual(len(results), 2)
1894 def _check_success(results):
1895 self.failUnlessEqual(len(results), 2)
1897 self.failUnless(results)
1899 mw = self._make_new_mw("si1", 0)
1900 mw.set_checkstring("this is a lie")
1902 mw.put_block(self.block, i, self.salt)
1903 mw.put_encprivkey(self.encprivkey)
1904 mw.put_blockhashes(self.block_hash_tree)
1905 mw.put_sharehashes(self.share_hash_chain)
1906 mw.put_root_hash(self.root_hash)
1907 mw.put_signature(self.signature)
1908 mw.put_verification_key(self.verification_key)
1909 d = mw.finish_publishing()
1910 d.addCallback(_check_failure)
1911 d.addCallback(lambda ignored:
1912 mw.set_checkstring(""))
1913 d.addCallback(lambda ignored:
1914 mw.finish_publishing())
1915 d.addCallback(_check_success)
1919 def serialize_blockhashes(self, blockhashes):
1920 return "".join(blockhashes)
1923 def serialize_sharehashes(self, sharehashes):
1924 ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1925 for i in sorted(sharehashes.keys())])
1929 def test_write(self):
1930 # This translates to a file with 6 6-byte segments, and with 2-byte
1932 mw = self._make_new_mw("si1", 0)
1933 # Test writing some blocks.
1934 read = self.ss.remote_slot_readv
1935 expected_private_key_offset = struct.calcsize(MDMFHEADER)
1936 expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1937 PRIVATE_KEY_SIZE + \
1939 VERIFICATION_KEY_SIZE + \
1940 SHARE_HASH_CHAIN_SIZE
1941 written_block_size = 2 + len(self.salt)
1942 written_block = self.block + self.salt
1944 mw.put_block(self.block, i, self.salt)
1946 mw.put_encprivkey(self.encprivkey)
1947 mw.put_blockhashes(self.block_hash_tree)
1948 mw.put_sharehashes(self.share_hash_chain)
1949 mw.put_root_hash(self.root_hash)
1950 mw.put_signature(self.signature)
1951 mw.put_verification_key(self.verification_key)
1952 d = mw.finish_publishing()
1953 def _check_publish(results):
1954 self.failUnlessEqual(len(results), 2)
1955 result, ign = results
1956 self.failUnless(result, "publish failed")
1958 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1959 {0: [written_block]})
1961 self.failUnlessEqual(len(self.encprivkey), 7)
1962 self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1963 {0: [self.encprivkey]})
1965 expected_block_hash_offset = expected_sharedata_offset + \
1966 (6 * written_block_size)
1967 self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1968 self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1969 {0: [self.block_hash_tree_s]})
1971 expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1972 self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1973 {0: [self.share_hash_chain_s]})
1975 self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1976 {0: [self.root_hash]})
1977 expected_signature_offset = expected_share_hash_offset + \
1978 len(self.share_hash_chain_s)
1979 self.failUnlessEqual(len(self.signature), 9)
1980 self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1981 {0: [self.signature]})
1983 expected_verification_key_offset = expected_signature_offset + len(self.signature)
1984 self.failUnlessEqual(len(self.verification_key), 6)
1985 self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1986 {0: [self.verification_key]})
1988 signable = mw.get_signable()
1989 verno, seq, roothash, k, n, segsize, datalen = \
1990 struct.unpack(">BQ32sBBQQ",
1992 self.failUnlessEqual(verno, 1)
1993 self.failUnlessEqual(seq, 0)
1994 self.failUnlessEqual(roothash, self.root_hash)
1995 self.failUnlessEqual(k, 3)
1996 self.failUnlessEqual(n, 10)
1997 self.failUnlessEqual(segsize, 6)
1998 self.failUnlessEqual(datalen, 36)
1999 expected_eof_offset = expected_block_hash_offset + \
2000 len(self.block_hash_tree_s)
2002 # Check the version number to make sure that it is correct.
2003 expected_version_number = struct.pack(">B", 1)
2004 self.failUnlessEqual(read("si1", [0], [(0, 1)]),
2005 {0: [expected_version_number]})
2006 # Check the sequence number to make sure that it is correct
2007 expected_sequence_number = struct.pack(">Q", 0)
2008 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2009 {0: [expected_sequence_number]})
2010 # Check that the encoding parameters (k, N, segement size, data
2011 # length) are what they should be. These are 3, 10, 6, 36
2012 expected_k = struct.pack(">B", 3)
2013 self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2015 expected_n = struct.pack(">B", 10)
2016 self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2018 expected_segment_size = struct.pack(">Q", 6)
2019 self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2020 {0: [expected_segment_size]})
2021 expected_data_length = struct.pack(">Q", 36)
2022 self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2023 {0: [expected_data_length]})
2024 expected_offset = struct.pack(">Q", expected_private_key_offset)
2025 self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2026 {0: [expected_offset]})
2027 expected_offset = struct.pack(">Q", expected_share_hash_offset)
2028 self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2029 {0: [expected_offset]})
2030 expected_offset = struct.pack(">Q", expected_signature_offset)
2031 self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2032 {0: [expected_offset]})
2033 expected_offset = struct.pack(">Q", expected_verification_key_offset)
2034 self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2035 {0: [expected_offset]})
2036 expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2037 self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2038 {0: [expected_offset]})
2039 expected_offset = struct.pack(">Q", expected_sharedata_offset)
2040 self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2041 {0: [expected_offset]})
2042 expected_offset = struct.pack(">Q", expected_block_hash_offset)
2043 self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2044 {0: [expected_offset]})
2045 expected_offset = struct.pack(">Q", expected_eof_offset)
2046 self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2047 {0: [expected_offset]})
2048 d.addCallback(_check_publish)
2051 def _make_new_mw(self, si, share, datalength=36):
2052 # This is a file of size 36 bytes. Since it has a segment
2053 # size of 6, we know that it has 6 byte segments, which will
2054 # be split into blocks of 2 bytes because our FEC k
2056 mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2061 def test_write_rejected_with_too_many_blocks(self):
2062 mw = self._make_new_mw("si0", 0)
2064 # Try writing too many blocks. We should not be able to write
2066 # blocks into each share.
2067 d = defer.succeed(None)
2069 d.addCallback(lambda ignored, i=i:
2070 mw.put_block(self.block, i, self.salt))
2071 d.addCallback(lambda ignored:
2072 self.shouldFail(LayoutInvalid, "too many blocks",
2074 mw.put_block, self.block, 7, self.salt))
2078 def test_write_rejected_with_invalid_salt(self):
2079 # Try writing an invalid salt. Salts are 16 bytes -- any more or
2080 # less should cause an error.
2081 mw = self._make_new_mw("si1", 0)
2082 bad_salt = "a" * 17 # 17 bytes
2083 d = defer.succeed(None)
2084 d.addCallback(lambda ignored:
2085 self.shouldFail(LayoutInvalid, "test_invalid_salt",
2086 None, mw.put_block, self.block, 7, bad_salt))
2090 def test_write_rejected_with_invalid_root_hash(self):
2091 # Try writing an invalid root hash. This should be SHA256d, and
2092 # 32 bytes long as a result.
2093 mw = self._make_new_mw("si2", 0)
2094 # 17 bytes != 32 bytes
2095 invalid_root_hash = "a" * 17
2096 d = defer.succeed(None)
2097 # Before this test can work, we need to put some blocks + salts,
2098 # a block hash tree, and a share hash tree. Otherwise, we'll see
2099 # failures that match what we are looking for, but are caused by
2100 # the constraints imposed on operation ordering.
2102 d.addCallback(lambda ignored, i=i:
2103 mw.put_block(self.block, i, self.salt))
2104 d.addCallback(lambda ignored:
2105 mw.put_encprivkey(self.encprivkey))
2106 d.addCallback(lambda ignored:
2107 mw.put_blockhashes(self.block_hash_tree))
2108 d.addCallback(lambda ignored:
2109 mw.put_sharehashes(self.share_hash_chain))
2110 d.addCallback(lambda ignored:
2111 self.shouldFail(LayoutInvalid, "invalid root hash",
2112 None, mw.put_root_hash, invalid_root_hash))
2116 def test_write_rejected_with_invalid_blocksize(self):
2117 # The blocksize implied by the writer that we get from
2118 # _make_new_mw is 2bytes -- any more or any less than this
2119 # should be cause for failure, unless it is the tail segment, in
2120 # which case it may not be failure.
2122 mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2124 # 1 bytes != 2 bytes
2125 d = defer.succeed(None)
2126 d.addCallback(lambda ignored, invalid_block=invalid_block:
2127 self.shouldFail(LayoutInvalid, "test blocksize too small",
2128 None, mw.put_block, invalid_block, 0,
2130 invalid_block = invalid_block * 3
2131 # 3 bytes != 2 bytes
2132 d.addCallback(lambda ignored:
2133 self.shouldFail(LayoutInvalid, "test blocksize too large",
2135 mw.put_block, invalid_block, 0, self.salt))
2137 d.addCallback(lambda ignored, i=i:
2138 mw.put_block(self.block, i, self.salt))
2139 # Try to put an invalid tail segment
2140 d.addCallback(lambda ignored:
2141 self.shouldFail(LayoutInvalid, "test invalid tail segment",
2143 mw.put_block, self.block, 5, self.salt))
2145 d.addCallback(lambda ignored:
2146 mw.put_block(valid_block, 5, self.salt))
2150 def test_write_enforces_order_constraints(self):
2151 # We require that the MDMFSlotWriteProxy be interacted with in a
2155 # 1: write blocks and salts
2156 # 2: Write the encrypted private key
2157 # 3: Write the block hashes
2158 # 4: Write the share hashes
2159 # 5: Write the root hash and salt hash
2160 # 6: Write the signature and verification key
2161 # 7: Write the file.
2163 # Some of these can be performed out-of-order, and some can't.
2164 # The dependencies that I want to test here are:
2165 # - Private key before block hashes
2166 # - share hashes and block hashes before root hash
2167 # - root hash before signature
2168 # - signature before verification key
2169 mw0 = self._make_new_mw("si0", 0)
2171 d = defer.succeed(None)
2173 d.addCallback(lambda ignored, i=i:
2174 mw0.put_block(self.block, i, self.salt))
2176 # Try to write the share hash chain without writing the
2177 # encrypted private key
2178 d.addCallback(lambda ignored:
2179 self.shouldFail(LayoutInvalid, "share hash chain before "
2182 mw0.put_sharehashes, self.share_hash_chain))
2183 # Write the private key.
2184 d.addCallback(lambda ignored:
2185 mw0.put_encprivkey(self.encprivkey))
2187 # Now write the block hashes and try again
2188 d.addCallback(lambda ignored:
2189 mw0.put_blockhashes(self.block_hash_tree))
2191 # We haven't yet put the root hash on the share, so we shouldn't
2192 # be able to sign it.
2193 d.addCallback(lambda ignored:
2194 self.shouldFail(LayoutInvalid, "signature before root hash",
2195 None, mw0.put_signature, self.signature))
2197 d.addCallback(lambda ignored:
2198 self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2200 # ..and, since that fails, we also shouldn't be able to put the
2202 d.addCallback(lambda ignored:
2203 self.shouldFail(LayoutInvalid, "key before signature",
2204 None, mw0.put_verification_key,
2205 self.verification_key))
2207 # Now write the share hashes.
2208 d.addCallback(lambda ignored:
2209 mw0.put_sharehashes(self.share_hash_chain))
2210 # We should be able to write the root hash now too
2211 d.addCallback(lambda ignored:
2212 mw0.put_root_hash(self.root_hash))
2214 # We should still be unable to put the verification key
2215 d.addCallback(lambda ignored:
2216 self.shouldFail(LayoutInvalid, "key before signature",
2217 None, mw0.put_verification_key,
2218 self.verification_key))
2220 d.addCallback(lambda ignored:
2221 mw0.put_signature(self.signature))
2223 # We shouldn't be able to write the offsets to the remote server
2224 # until the offset table is finished; IOW, until we have written
2225 # the verification key.
2226 d.addCallback(lambda ignored:
2227 self.shouldFail(LayoutInvalid, "offsets before verification key",
2229 mw0.finish_publishing))
2231 d.addCallback(lambda ignored:
2232 mw0.put_verification_key(self.verification_key))
2236 def test_end_to_end(self):
2237 mw = self._make_new_mw("si1", 0)
2238 # Write a share using the mutable writer, and make sure that the
2239 # reader knows how to read everything back to us.
2240 d = defer.succeed(None)
2242 d.addCallback(lambda ignored, i=i:
2243 mw.put_block(self.block, i, self.salt))
2244 d.addCallback(lambda ignored:
2245 mw.put_encprivkey(self.encprivkey))
2246 d.addCallback(lambda ignored:
2247 mw.put_blockhashes(self.block_hash_tree))
2248 d.addCallback(lambda ignored:
2249 mw.put_sharehashes(self.share_hash_chain))
2250 d.addCallback(lambda ignored:
2251 mw.put_root_hash(self.root_hash))
2252 d.addCallback(lambda ignored:
2253 mw.put_signature(self.signature))
2254 d.addCallback(lambda ignored:
2255 mw.put_verification_key(self.verification_key))
2256 d.addCallback(lambda ignored:
2257 mw.finish_publishing())
2259 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2260 def _check_block_and_salt((block, salt)):
2261 self.failUnlessEqual(block, self.block)
2262 self.failUnlessEqual(salt, self.salt)
2265 d.addCallback(lambda ignored, i=i:
2266 mr.get_block_and_salt(i))
2267 d.addCallback(_check_block_and_salt)
2269 d.addCallback(lambda ignored:
2270 mr.get_encprivkey())
2271 d.addCallback(lambda encprivkey:
2272 self.failUnlessEqual(self.encprivkey, encprivkey))
2274 d.addCallback(lambda ignored:
2275 mr.get_blockhashes())
2276 d.addCallback(lambda blockhashes:
2277 self.failUnlessEqual(self.block_hash_tree, blockhashes))
2279 d.addCallback(lambda ignored:
2280 mr.get_sharehashes())
2281 d.addCallback(lambda sharehashes:
2282 self.failUnlessEqual(self.share_hash_chain, sharehashes))
2284 d.addCallback(lambda ignored:
2286 d.addCallback(lambda signature:
2287 self.failUnlessEqual(signature, self.signature))
2289 d.addCallback(lambda ignored:
2290 mr.get_verification_key())
2291 d.addCallback(lambda verification_key:
2292 self.failUnlessEqual(verification_key, self.verification_key))
2294 d.addCallback(lambda ignored:
2296 d.addCallback(lambda seqnum:
2297 self.failUnlessEqual(seqnum, 0))
2299 d.addCallback(lambda ignored:
2301 d.addCallback(lambda root_hash:
2302 self.failUnlessEqual(self.root_hash, root_hash))
2304 d.addCallback(lambda ignored:
2305 mr.get_encoding_parameters())
2306 def _check_encoding_parameters((k, n, segsize, datalen)):
2307 self.failUnlessEqual(k, 3)
2308 self.failUnlessEqual(n, 10)
2309 self.failUnlessEqual(segsize, 6)
2310 self.failUnlessEqual(datalen, 36)
2311 d.addCallback(_check_encoding_parameters)
2313 d.addCallback(lambda ignored:
2314 mr.get_checkstring())
2315 d.addCallback(lambda checkstring:
2316 self.failUnlessEqual(checkstring, mw.get_checkstring()))
2320 def test_is_sdmf(self):
2321 # The MDMFSlotReadProxy should also know how to read SDMF files,
2322 # since it will encounter them on the grid. Callers use the
2323 # is_sdmf method to test this.
2324 self.write_sdmf_share_to_server("si1")
2325 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2327 d.addCallback(lambda issdmf:
2328 self.failUnless(issdmf))
2332 def test_reads_sdmf(self):
2333 # The slot read proxy should, naturally, know how to tell us
2334 # about data in the SDMF format
2335 self.write_sdmf_share_to_server("si1")
2336 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2337 d = defer.succeed(None)
2338 d.addCallback(lambda ignored:
2340 d.addCallback(lambda issdmf:
2341 self.failUnless(issdmf))
2343 # What do we need to read?
2346 d.addCallback(lambda ignored:
2347 mr.get_block_and_salt(0))
2348 def _check_block_and_salt(results):
2349 block, salt = results
2350 # Our original file is 36 bytes long. Then each share is 12
2351 # bytes in size. The share is composed entirely of the
2352 # letter a. self.block contains 2 as, so 6 * self.block is
2353 # what we are looking for.
2354 self.failUnlessEqual(block, self.block * 6)
2355 self.failUnlessEqual(salt, self.salt)
2356 d.addCallback(_check_block_and_salt)
2359 d.addCallback(lambda ignored:
2360 mr.get_blockhashes())
2361 d.addCallback(lambda blockhashes:
2362 self.failUnlessEqual(self.block_hash_tree,
2366 d.addCallback(lambda ignored:
2367 mr.get_sharehashes())
2368 d.addCallback(lambda sharehashes:
2369 self.failUnlessEqual(self.share_hash_chain,
2372 d.addCallback(lambda ignored:
2373 mr.get_encprivkey())
2374 d.addCallback(lambda encprivkey:
2375 self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2376 d.addCallback(lambda ignored:
2377 mr.get_verification_key())
2378 d.addCallback(lambda verification_key:
2379 self.failUnlessEqual(verification_key,
2380 self.verification_key,
2383 d.addCallback(lambda ignored:
2385 d.addCallback(lambda signature:
2386 self.failUnlessEqual(signature, self.signature, signature))
2388 # - The sequence number
2389 d.addCallback(lambda ignored:
2391 d.addCallback(lambda seqnum:
2392 self.failUnlessEqual(seqnum, 0, seqnum))
2395 d.addCallback(lambda ignored:
2397 d.addCallback(lambda root_hash:
2398 self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2402 def test_only_reads_one_segment_sdmf(self):
2403 # SDMF shares have only one segment, so it doesn't make sense to
2404 # read more segments than that. The reader should know this and
2405 # complain if we try to do that.
2406 self.write_sdmf_share_to_server("si1")
2407 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2408 d = defer.succeed(None)
2409 d.addCallback(lambda ignored:
2411 d.addCallback(lambda issdmf:
2412 self.failUnless(issdmf))
2413 d.addCallback(lambda ignored:
2414 self.shouldFail(LayoutInvalid, "test bad segment",
2416 mr.get_block_and_salt, 1))
2420 def test_read_with_prefetched_mdmf_data(self):
2421 # The MDMFSlotReadProxy will prefill certain fields if you pass
2422 # it data that you have already fetched. This is useful for
2423 # cases like the Servermap, which prefetches ~2kb of data while
2424 # finding out which shares are on the remote peer so that it
2425 # doesn't waste round trips.
2426 mdmf_data = self.build_test_mdmf_share()
2427 self.write_test_share_to_server("si1")
2428 def _make_mr(ignored, length):
2429 mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2432 d = defer.succeed(None)
2433 # This should be enough to fill in both the encoding parameters
2434 # and the table of offsets, which will complete the version
2435 # information tuple.
2436 d.addCallback(_make_mr, 123)
2437 d.addCallback(lambda mr:
2439 def _check_verinfo(verinfo):
2440 self.failUnless(verinfo)
2441 self.failUnlessEqual(len(verinfo), 9)
2451 self.failUnlessEqual(seqnum, 0)
2452 self.failUnlessEqual(root_hash, self.root_hash)
2453 self.failUnlessEqual(segsize, 6)
2454 self.failUnlessEqual(datalen, 36)
2455 self.failUnlessEqual(k, 3)
2456 self.failUnlessEqual(n, 10)
2457 expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2465 self.failUnlessEqual(expected_prefix, prefix)
2466 self.failUnlessEqual(self.rref.read_count, 0)
2467 d.addCallback(_check_verinfo)
2468 # This is not enough data to read a block and a share, so the
2469 # wrapper should attempt to read this from the remote server.
2470 d.addCallback(_make_mr, 123)
2471 d.addCallback(lambda mr:
2472 mr.get_block_and_salt(0))
2473 def _check_block_and_salt((block, salt)):
2474 self.failUnlessEqual(block, self.block)
2475 self.failUnlessEqual(salt, self.salt)
2476 self.failUnlessEqual(self.rref.read_count, 1)
2477 # This should be enough data to read one block.
2478 d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2479 d.addCallback(lambda mr:
2480 mr.get_block_and_salt(0))
2481 d.addCallback(_check_block_and_salt)
2485 def test_read_with_prefetched_sdmf_data(self):
2486 sdmf_data = self.build_test_sdmf_share()
2487 self.write_sdmf_share_to_server("si1")
2488 def _make_mr(ignored, length):
2489 mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2492 d = defer.succeed(None)
2493 # This should be enough to get us the encoding parameters,
2494 # offset table, and everything else we need to build a verinfo
2496 d.addCallback(_make_mr, 123)
2497 d.addCallback(lambda mr:
2499 def _check_verinfo(verinfo):
2500 self.failUnless(verinfo)
2501 self.failUnlessEqual(len(verinfo), 9)
2511 self.failUnlessEqual(seqnum, 0)
2512 self.failUnlessEqual(root_hash, self.root_hash)
2513 self.failUnlessEqual(salt, self.salt)
2514 self.failUnlessEqual(segsize, 36)
2515 self.failUnlessEqual(datalen, 36)
2516 self.failUnlessEqual(k, 3)
2517 self.failUnlessEqual(n, 10)
2518 expected_prefix = struct.pack(SIGNED_PREFIX,
2527 self.failUnlessEqual(expected_prefix, prefix)
2528 self.failUnlessEqual(self.rref.read_count, 0)
2529 d.addCallback(_check_verinfo)
2530 # This shouldn't be enough to read any share data.
2531 d.addCallback(_make_mr, 123)
2532 d.addCallback(lambda mr:
2533 mr.get_block_and_salt(0))
2534 def _check_block_and_salt((block, salt)):
2535 self.failUnlessEqual(block, self.block * 6)
2536 self.failUnlessEqual(salt, self.salt)
2537 # TODO: Fix the read routine so that it reads only the data
2538 # that it has cached if it can't read all of it.
2539 self.failUnlessEqual(self.rref.read_count, 2)
2541 # This should be enough to read share data.
2542 d.addCallback(_make_mr, self.offsets['share_data'])
2543 d.addCallback(lambda mr:
2544 mr.get_block_and_salt(0))
2545 d.addCallback(_check_block_and_salt)
2549 def test_read_with_empty_mdmf_file(self):
2550 # Some tests upload a file with no contents to test things
2551 # unrelated to the actual handling of the content of the file.
2552 # The reader should behave intelligently in these cases.
2553 self.write_test_share_to_server("si1", empty=True)
2554 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2555 # We should be able to get the encoding parameters, and they
2556 # should be correct.
2557 d = defer.succeed(None)
2558 d.addCallback(lambda ignored:
2559 mr.get_encoding_parameters())
2560 def _check_encoding_parameters(params):
2561 self.failUnlessEqual(len(params), 4)
2562 k, n, segsize, datalen = params
2563 self.failUnlessEqual(k, 3)
2564 self.failUnlessEqual(n, 10)
2565 self.failUnlessEqual(segsize, 0)
2566 self.failUnlessEqual(datalen, 0)
2567 d.addCallback(_check_encoding_parameters)
2569 # We should not be able to fetch a block, since there are no
2571 d.addCallback(lambda ignored:
2572 self.shouldFail(LayoutInvalid, "get block on empty file",
2574 mr.get_block_and_salt, 0))
2578 def test_read_with_empty_sdmf_file(self):
2579 self.write_sdmf_share_to_server("si1", empty=True)
2580 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2581 # We should be able to get the encoding parameters, and they
2583 d = defer.succeed(None)
2584 d.addCallback(lambda ignored:
2585 mr.get_encoding_parameters())
2586 def _check_encoding_parameters(params):
2587 self.failUnlessEqual(len(params), 4)
2588 k, n, segsize, datalen = params
2589 self.failUnlessEqual(k, 3)
2590 self.failUnlessEqual(n, 10)
2591 self.failUnlessEqual(segsize, 0)
2592 self.failUnlessEqual(datalen, 0)
2593 d.addCallback(_check_encoding_parameters)
2595 # It does not make sense to get a block in this format, so we
2596 # should not be able to.
2597 d.addCallback(lambda ignored:
2598 self.shouldFail(LayoutInvalid, "get block on an empty file",
2600 mr.get_block_and_salt, 0))
2604 def test_verinfo_with_sdmf_file(self):
2605 self.write_sdmf_share_to_server("si1")
2606 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2607 # We should be able to get the version information.
2608 d = defer.succeed(None)
2609 d.addCallback(lambda ignored:
2611 def _check_verinfo(verinfo):
2612 self.failUnless(verinfo)
2613 self.failUnlessEqual(len(verinfo), 9)
2623 self.failUnlessEqual(seqnum, 0)
2624 self.failUnlessEqual(root_hash, self.root_hash)
2625 self.failUnlessEqual(salt, self.salt)
2626 self.failUnlessEqual(segsize, 36)
2627 self.failUnlessEqual(datalen, 36)
2628 self.failUnlessEqual(k, 3)
2629 self.failUnlessEqual(n, 10)
2630 expected_prefix = struct.pack(">BQ32s16s BBQQ",
2639 self.failUnlessEqual(prefix, expected_prefix)
2640 self.failUnlessEqual(offsets, self.offsets)
2641 d.addCallback(_check_verinfo)
2645 def test_verinfo_with_mdmf_file(self):
2646 self.write_test_share_to_server("si1")
2647 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2648 d = defer.succeed(None)
2649 d.addCallback(lambda ignored:
2651 def _check_verinfo(verinfo):
2652 self.failUnless(verinfo)
2653 self.failUnlessEqual(len(verinfo), 9)
2663 self.failUnlessEqual(seqnum, 0)
2664 self.failUnlessEqual(root_hash, self.root_hash)
2666 self.failUnlessEqual(segsize, 6)
2667 self.failUnlessEqual(datalen, 36)
2668 self.failUnlessEqual(k, 3)
2669 self.failUnlessEqual(n, 10)
2670 expected_prefix = struct.pack(">BQ32s BBQQ",
2678 self.failUnlessEqual(prefix, expected_prefix)
2679 self.failUnlessEqual(offsets, self.offsets)
2680 d.addCallback(_check_verinfo)
2684 def test_sdmf_writer(self):
2685 # Go through the motions of writing an SDMF share to the storage
2686 # server. Then read the storage server to see that the share got
2687 # written in the way that we think it should have.
2689 # We do this first so that the necessary instance variables get
2690 # set the way we want them for the tests below.
2691 data = self.build_test_sdmf_share()
2692 sdmfr = SDMFSlotWriteProxy(0,
2697 # Put the block and salt.
2698 sdmfr.put_block(self.blockdata, 0, self.salt)
2700 # Put the encprivkey
2701 sdmfr.put_encprivkey(self.encprivkey)
2703 # Put the block and share hash chains
2704 sdmfr.put_blockhashes(self.block_hash_tree)
2705 sdmfr.put_sharehashes(self.share_hash_chain)
2706 sdmfr.put_root_hash(self.root_hash)
2709 sdmfr.put_signature(self.signature)
2711 # Put the verification key
2712 sdmfr.put_verification_key(self.verification_key)
2714 # Now check to make sure that nothing has been written yet.
2715 self.failUnlessEqual(self.rref.write_count, 0)
2717 # Now finish publishing
2718 d = sdmfr.finish_publishing()
2720 self.failUnlessEqual(self.rref.write_count, 1)
2721 read = self.ss.remote_slot_readv
2722 self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2724 d.addCallback(_then)
2728 def test_sdmf_writer_preexisting_share(self):
2729 data = self.build_test_sdmf_share()
2730 self.write_sdmf_share_to_server("si1")
2732 # Now there is a share on the storage server. To successfully
2733 # write, we need to set the checkstring correctly. When we
2734 # don't, no write should occur.
2735 sdmfw = SDMFSlotWriteProxy(0,
2740 sdmfw.put_block(self.blockdata, 0, self.salt)
2742 # Put the encprivkey
2743 sdmfw.put_encprivkey(self.encprivkey)
2745 # Put the block and share hash chains
2746 sdmfw.put_blockhashes(self.block_hash_tree)
2747 sdmfw.put_sharehashes(self.share_hash_chain)
2750 sdmfw.put_root_hash(self.root_hash)
2753 sdmfw.put_signature(self.signature)
2755 # Put the verification key
2756 sdmfw.put_verification_key(self.verification_key)
2758 # We shouldn't have a checkstring yet
2759 self.failUnlessEqual(sdmfw.get_checkstring(), "")
2761 d = sdmfw.finish_publishing()
2763 self.failIf(results[0])
2764 # this is the correct checkstring
2765 self._expected_checkstring = results[1][0][0]
2766 return self._expected_checkstring
2768 d.addCallback(_then)
2769 d.addCallback(sdmfw.set_checkstring)
2770 d.addCallback(lambda ignored:
2771 sdmfw.get_checkstring())
2772 d.addCallback(lambda checkstring:
2773 self.failUnlessEqual(checkstring, self._expected_checkstring))
2774 d.addCallback(lambda ignored:
2775 sdmfw.finish_publishing())
2776 def _then_again(results):
2777 self.failUnless(results[0])
2778 read = self.ss.remote_slot_readv
2779 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2780 {0: [struct.pack(">Q", 1)]})
2781 self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2783 d.addCallback(_then_again)
2787 class Stats(unittest.TestCase):
2790 self.sparent = LoggingServiceParent()
2791 self._lease_secret = itertools.count()
2793 return self.sparent.stopService()
2795 def workdir(self, name):
2796 basedir = os.path.join("storage", "Server", name)
2799 def create(self, name):
2800 workdir = self.workdir(name)
2801 ss = StorageServer(workdir, "\x00" * 20)
2802 ss.setServiceParent(self.sparent)
2805 def test_latencies(self):
2806 ss = self.create("test_latencies")
2807 for i in range(10000):
2808 ss.add_latency("allocate", 1.0 * i)
2809 for i in range(1000):
2810 ss.add_latency("renew", 1.0 * i)
2812 ss.add_latency("write", 1.0 * i)
2814 ss.add_latency("cancel", 2.0 * i)
2815 ss.add_latency("get", 5.0)
2817 output = ss.get_latencies()
2819 self.failUnlessEqual(sorted(output.keys()),
2820 sorted(["allocate", "renew", "cancel", "write", "get"]))
2821 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2822 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2823 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2824 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2825 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2826 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2827 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2828 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2829 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2831 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2832 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2833 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
2834 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2835 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2836 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2837 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2838 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2839 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2841 self.failUnlessEqual(len(ss.latencies["write"]), 20)
2842 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2843 self.failUnless(output["write"]["01_0_percentile"] is None, output)
2844 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
2845 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2846 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2847 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2848 self.failUnless(output["write"]["99_0_percentile"] is None, output)
2849 self.failUnless(output["write"]["99_9_percentile"] is None, output)
2851 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2852 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2853 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2854 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
2855 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2856 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2857 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2858 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2859 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2861 self.failUnlessEqual(len(ss.latencies["get"]), 1)
2862 self.failUnless(output["get"]["mean"] is None, output)
2863 self.failUnless(output["get"]["01_0_percentile"] is None, output)
2864 self.failUnless(output["get"]["10_0_percentile"] is None, output)
2865 self.failUnless(output["get"]["50_0_percentile"] is None, output)
2866 self.failUnless(output["get"]["90_0_percentile"] is None, output)
2867 self.failUnless(output["get"]["95_0_percentile"] is None, output)
2868 self.failUnless(output["get"]["99_0_percentile"] is None, output)
2869 self.failUnless(output["get"]["99_9_percentile"] is None, output)
2872 s = re.sub(r'<[^>]*>', ' ', s)
2873 s = re.sub(r'\s+', ' ', s)
2876 class MyBucketCountingCrawler(BucketCountingCrawler):
2877 def finished_prefix(self, cycle, prefix):
2878 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2880 d = self.hook_ds.pop(0)
2883 class MyStorageServer(StorageServer):
2884 def add_bucket_counter(self):
2885 statefile = os.path.join(self.storedir, "bucket_counter.state")
2886 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2887 self.bucket_counter.setServiceParent(self)
2889 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2892 self.s = service.MultiService()
2893 self.s.startService()
2895 return self.s.stopService()
2897 def test_bucket_counter(self):
2898 basedir = "storage/BucketCounter/bucket_counter"
2899 fileutil.make_dirs(basedir)
2900 ss = StorageServer(basedir, "\x00" * 20)
2901 # to make sure we capture the bucket-counting-crawler in the middle
2902 # of a cycle, we reach in and reduce its maximum slice time to 0. We
2903 # also make it start sooner than usual.
2904 ss.bucket_counter.slow_start = 0
2905 orig_cpu_slice = ss.bucket_counter.cpu_slice
2906 ss.bucket_counter.cpu_slice = 0
2907 ss.setServiceParent(self.s)
2909 w = StorageStatus(ss)
2911 # this sample is before the crawler has started doing anything
2912 html = w.renderSynchronously()
2913 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2914 s = remove_tags(html)
2915 self.failUnlessIn("Accepting new shares: Yes", s)
2916 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2917 self.failUnlessIn("Total buckets: Not computed yet", s)
2918 self.failUnlessIn("Next crawl in", s)
2920 # give the bucket-counting-crawler one tick to get started. The
2921 # cpu_slice=0 will force it to yield right after it processes the
2924 d = fireEventually()
2925 def _check(ignored):
2926 # are we really right after the first prefix?
2927 state = ss.bucket_counter.get_state()
2928 if state["last-complete-prefix"] is None:
2929 d2 = fireEventually()
2930 d2.addCallback(_check)
2932 self.failUnlessEqual(state["last-complete-prefix"],
2933 ss.bucket_counter.prefixes[0])
2934 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2935 html = w.renderSynchronously()
2936 s = remove_tags(html)
2937 self.failUnlessIn(" Current crawl ", s)
2938 self.failUnlessIn(" (next work in ", s)
2939 d.addCallback(_check)
2941 # now give it enough time to complete a full cycle
2943 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2944 d.addCallback(lambda ignored: self.poll(_watch))
2945 def _check2(ignored):
2946 ss.bucket_counter.cpu_slice = orig_cpu_slice
2947 html = w.renderSynchronously()
2948 s = remove_tags(html)
2949 self.failUnlessIn("Total buckets: 0 (the number of", s)
2950 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2951 d.addCallback(_check2)
2954 def test_bucket_counter_cleanup(self):
2955 basedir = "storage/BucketCounter/bucket_counter_cleanup"
2956 fileutil.make_dirs(basedir)
2957 ss = StorageServer(basedir, "\x00" * 20)
2958 # to make sure we capture the bucket-counting-crawler in the middle
2959 # of a cycle, we reach in and reduce its maximum slice time to 0.
2960 ss.bucket_counter.slow_start = 0
2961 orig_cpu_slice = ss.bucket_counter.cpu_slice
2962 ss.bucket_counter.cpu_slice = 0
2963 ss.setServiceParent(self.s)
2965 d = fireEventually()
2967 def _after_first_prefix(ignored):
2968 state = ss.bucket_counter.state
2969 if state["last-complete-prefix"] is None:
2970 d2 = fireEventually()
2971 d2.addCallback(_after_first_prefix)
2973 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2974 # now sneak in and mess with its state, to make sure it cleans up
2975 # properly at the end of the cycle
2976 self.failUnlessEqual(state["last-complete-prefix"],
2977 ss.bucket_counter.prefixes[0])
2978 state["bucket-counts"][-12] = {}
2979 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2980 ss.bucket_counter.save_state()
2981 d.addCallback(_after_first_prefix)
2983 # now give it enough time to complete a cycle
2985 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2986 d.addCallback(lambda ignored: self.poll(_watch))
2987 def _check2(ignored):
2988 ss.bucket_counter.cpu_slice = orig_cpu_slice
2989 s = ss.bucket_counter.get_state()
2990 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2991 self.failIf("bogusprefix!" in s["storage-index-samples"],
2992 s["storage-index-samples"].keys())
2993 d.addCallback(_check2)
2996 def test_bucket_counter_eta(self):
2997 basedir = "storage/BucketCounter/bucket_counter_eta"
2998 fileutil.make_dirs(basedir)
2999 ss = MyStorageServer(basedir, "\x00" * 20)
3000 ss.bucket_counter.slow_start = 0
3001 # these will be fired inside finished_prefix()
3002 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
3003 w = StorageStatus(ss)
3005 d = defer.Deferred()
3007 def _check_1(ignored):
3008 # no ETA is available yet
3009 html = w.renderSynchronously()
3010 s = remove_tags(html)
3011 self.failUnlessIn("complete (next work", s)
3013 def _check_2(ignored):
3014 # one prefix has finished, so an ETA based upon that elapsed time
3015 # should be available.
3016 html = w.renderSynchronously()
3017 s = remove_tags(html)
3018 self.failUnlessIn("complete (ETA ", s)
3020 def _check_3(ignored):
3021 # two prefixes have finished
3022 html = w.renderSynchronously()
3023 s = remove_tags(html)
3024 self.failUnlessIn("complete (ETA ", s)
3027 hooks[0].addCallback(_check_1).addErrback(d.errback)
3028 hooks[1].addCallback(_check_2).addErrback(d.errback)
3029 hooks[2].addCallback(_check_3).addErrback(d.errback)
3031 ss.setServiceParent(self.s)
3034 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3035 stop_after_first_bucket = False
3036 def process_bucket(self, *args, **kwargs):
3037 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3038 if self.stop_after_first_bucket:
3039 self.stop_after_first_bucket = False
3040 self.cpu_slice = -1.0
3041 def yielding(self, sleep_time):
3042 if not self.stop_after_first_bucket:
3043 self.cpu_slice = 500
3045 class BrokenStatResults:
3047 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3050 bsr = BrokenStatResults()
3051 for attrname in dir(s):
3052 if attrname.startswith("_"):
3054 if attrname == "st_blocks":
3056 setattr(bsr, attrname, getattr(s, attrname))
3059 class InstrumentedStorageServer(StorageServer):
3060 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3061 class No_ST_BLOCKS_StorageServer(StorageServer):
3062 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3064 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3067 self.s = service.MultiService()
3068 self.s.startService()
3070 return self.s.stopService()
3072 def make_shares(self, ss):
3074 return (si, hashutil.tagged_hash("renew", si),
3075 hashutil.tagged_hash("cancel", si))
3076 def make_mutable(si):
3077 return (si, hashutil.tagged_hash("renew", si),
3078 hashutil.tagged_hash("cancel", si),
3079 hashutil.tagged_hash("write-enabler", si))
3080 def make_extra_lease(si, num):
3081 return (hashutil.tagged_hash("renew-%d" % num, si),
3082 hashutil.tagged_hash("cancel-%d" % num, si))
3084 immutable_si_0, rs0, cs0 = make("\x00" * 16)
3085 immutable_si_1, rs1, cs1 = make("\x01" * 16)
3086 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3087 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3088 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3089 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3091 canary = FakeCanary()
3092 # note: 'tahoe debug dump-share' will not handle this file, since the
3093 # inner contents are not a valid CHK share
3094 data = "\xff" * 1000
3096 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3098 w[0].remote_write(0, data)
3101 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3103 w[0].remote_write(0, data)
3105 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3107 writev = ss.remote_slot_testv_and_readv_and_writev
3108 writev(mutable_si_2, (we2, rs2, cs2),
3109 {0: ([], [(0,data)], len(data))}, [])
3110 writev(mutable_si_3, (we3, rs3, cs3),
3111 {0: ([], [(0,data)], len(data))}, [])
3112 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3114 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3115 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3116 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3118 def test_basic(self):
3119 basedir = "storage/LeaseCrawler/basic"
3120 fileutil.make_dirs(basedir)
3121 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3122 # make it start sooner than usual.
3123 lc = ss.lease_checker
3126 lc.stop_after_first_bucket = True
3127 webstatus = StorageStatus(ss)
3129 # create a few shares, with some leases on them
3130 self.make_shares(ss)
3131 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3133 # add a non-sharefile to exercise another code path
3134 fn = os.path.join(ss.sharedir,
3135 storage_index_to_dir(immutable_si_0),
3138 f.write("I am not a share.\n")
3141 # this is before the crawl has started, so we're not in a cycle yet
3142 initial_state = lc.get_state()
3143 self.failIf(lc.get_progress()["cycle-in-progress"])
3144 self.failIfIn("cycle-to-date", initial_state)
3145 self.failIfIn("estimated-remaining-cycle", initial_state)
3146 self.failIfIn("estimated-current-cycle", initial_state)
3147 self.failUnlessIn("history", initial_state)
3148 self.failUnlessEqual(initial_state["history"], {})
3150 ss.setServiceParent(self.s)
3154 d = fireEventually()
3156 # now examine the state right after the first bucket has been
3158 def _after_first_bucket(ignored):
3159 initial_state = lc.get_state()
3160 if "cycle-to-date" not in initial_state:
3161 d2 = fireEventually()
3162 d2.addCallback(_after_first_bucket)
3164 self.failUnlessIn("cycle-to-date", initial_state)
3165 self.failUnlessIn("estimated-remaining-cycle", initial_state)
3166 self.failUnlessIn("estimated-current-cycle", initial_state)
3167 self.failUnlessIn("history", initial_state)
3168 self.failUnlessEqual(initial_state["history"], {})
3170 so_far = initial_state["cycle-to-date"]
3171 self.failUnlessEqual(so_far["expiration-enabled"], False)
3172 self.failUnlessIn("configured-expiration-mode", so_far)
3173 self.failUnlessIn("lease-age-histogram", so_far)
3174 lah = so_far["lease-age-histogram"]
3175 self.failUnlessEqual(type(lah), list)
3176 self.failUnlessEqual(len(lah), 1)
3177 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3178 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3179 self.failUnlessEqual(so_far["corrupt-shares"], [])
3180 sr1 = so_far["space-recovered"]
3181 self.failUnlessEqual(sr1["examined-buckets"], 1)
3182 self.failUnlessEqual(sr1["examined-shares"], 1)
3183 self.failUnlessEqual(sr1["actual-shares"], 0)
3184 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3185 self.failUnlessEqual(sr1["original-sharebytes"], 0)
3186 left = initial_state["estimated-remaining-cycle"]
3187 sr2 = left["space-recovered"]
3188 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3189 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3190 self.failIfEqual(sr2["actual-shares"], None)
3191 self.failIfEqual(sr2["configured-diskbytes"], None)
3192 self.failIfEqual(sr2["original-sharebytes"], None)
3193 d.addCallback(_after_first_bucket)
3194 d.addCallback(lambda ign: self.render1(webstatus))
3195 def _check_html_in_cycle(html):
3196 s = remove_tags(html)
3197 self.failUnlessIn("So far, this cycle has examined "
3198 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3199 self.failUnlessIn("and has recovered: "
3200 "0 shares, 0 buckets (0 mutable / 0 immutable), "
3201 "0 B (0 B / 0 B)", s)
3202 self.failUnlessIn("If expiration were enabled, "
3203 "we would have recovered: "
3204 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3205 " 0 B (0 B / 0 B) by now", s)
3206 self.failUnlessIn("and the remainder of this cycle "
3207 "would probably recover: "
3208 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3209 " 0 B (0 B / 0 B)", s)
3210 self.failUnlessIn("and the whole cycle would probably recover: "
3211 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3212 " 0 B (0 B / 0 B)", s)
3213 self.failUnlessIn("if we were strictly using each lease's default "
3214 "31-day lease lifetime", s)
3215 self.failUnlessIn("this cycle would be expected to recover: ", s)
3216 d.addCallback(_check_html_in_cycle)
3218 # wait for the crawler to finish the first cycle. Nothing should have
3221 return bool(lc.get_state()["last-cycle-finished"] is not None)
3222 d.addCallback(lambda ign: self.poll(_wait))
3224 def _after_first_cycle(ignored):
3226 self.failIf("cycle-to-date" in s)
3227 self.failIf("estimated-remaining-cycle" in s)
3228 self.failIf("estimated-current-cycle" in s)
3229 last = s["history"][0]
3230 self.failUnlessIn("cycle-start-finish-times", last)
3231 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3232 self.failUnlessEqual(last["expiration-enabled"], False)
3233 self.failUnlessIn("configured-expiration-mode", last)
3235 self.failUnlessIn("lease-age-histogram", last)
3236 lah = last["lease-age-histogram"]
3237 self.failUnlessEqual(type(lah), list)
3238 self.failUnlessEqual(len(lah), 1)
3239 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3241 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3242 self.failUnlessEqual(last["corrupt-shares"], [])
3244 rec = last["space-recovered"]
3245 self.failUnlessEqual(rec["examined-buckets"], 4)
3246 self.failUnlessEqual(rec["examined-shares"], 4)
3247 self.failUnlessEqual(rec["actual-buckets"], 0)
3248 self.failUnlessEqual(rec["original-buckets"], 0)
3249 self.failUnlessEqual(rec["configured-buckets"], 0)
3250 self.failUnlessEqual(rec["actual-shares"], 0)
3251 self.failUnlessEqual(rec["original-shares"], 0)
3252 self.failUnlessEqual(rec["configured-shares"], 0)
3253 self.failUnlessEqual(rec["actual-diskbytes"], 0)
3254 self.failUnlessEqual(rec["original-diskbytes"], 0)
3255 self.failUnlessEqual(rec["configured-diskbytes"], 0)
3256 self.failUnlessEqual(rec["actual-sharebytes"], 0)
3257 self.failUnlessEqual(rec["original-sharebytes"], 0)
3258 self.failUnlessEqual(rec["configured-sharebytes"], 0)
3260 def _get_sharefile(si):
3261 return list(ss._iter_share_files(si))[0]
3262 def count_leases(si):
3263 return len(list(_get_sharefile(si).get_leases()))
3264 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3265 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3266 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3267 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3268 d.addCallback(_after_first_cycle)
3269 d.addCallback(lambda ign: self.render1(webstatus))
3270 def _check_html(html):
3271 s = remove_tags(html)
3272 self.failUnlessIn("recovered: 0 shares, 0 buckets "
3273 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3274 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3275 "(2 mutable / 2 immutable),", s)
3276 self.failUnlessIn("but expiration was not enabled", s)
3277 d.addCallback(_check_html)
3278 d.addCallback(lambda ign: self.render_json(webstatus))
3279 def _check_json(json):
3280 data = simplejson.loads(json)
3281 self.failUnlessIn("lease-checker", data)
3282 self.failUnlessIn("lease-checker-progress", data)
3283 d.addCallback(_check_json)
3286 def backdate_lease(self, sf, renew_secret, new_expire_time):
3287 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3288 # "renew" a lease with a new_expire_time that is older than what the
3289 # current lease has), so we have to reach inside it.
3290 for i,lease in enumerate(sf.get_leases()):
3291 if lease.renew_secret == renew_secret:
3292 lease.expiration_time = new_expire_time
3293 f = open(sf.home, 'rb+')
3294 sf._write_lease_record(f, i, lease)
3297 raise IndexError("unable to renew non-existent lease")
3299 def test_expire_age(self):
3300 basedir = "storage/LeaseCrawler/expire_age"
3301 fileutil.make_dirs(basedir)
3302 # setting expiration_time to 2000 means that any lease which is more
3303 # than 2000s old will be expired.
3304 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3305 expiration_enabled=True,
3306 expiration_mode="age",
3307 expiration_override_lease_duration=2000)
3308 # make it start sooner than usual.
3309 lc = ss.lease_checker
3311 lc.stop_after_first_bucket = True
3312 webstatus = StorageStatus(ss)
3314 # create a few shares, with some leases on them
3315 self.make_shares(ss)
3316 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3318 def count_shares(si):
3319 return len(list(ss._iter_share_files(si)))
3320 def _get_sharefile(si):
3321 return list(ss._iter_share_files(si))[0]
3322 def count_leases(si):
3323 return len(list(_get_sharefile(si).get_leases()))
3325 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3326 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3327 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3328 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3329 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3330 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3331 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3332 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3334 # artificially crank back the expiration time on the first lease of
3335 # each share, to make it look like it expired already (age=1000s).
3336 # Some shares have an extra lease which is set to expire at the
3337 # default time in 31 days from now (age=31days). We then run the
3338 # crawler, which will expire the first lease, making some shares get
3339 # deleted and others stay alive (with one remaining lease)
3342 sf0 = _get_sharefile(immutable_si_0)
3343 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3344 sf0_size = os.stat(sf0.home).st_size
3346 # immutable_si_1 gets an extra lease
3347 sf1 = _get_sharefile(immutable_si_1)
3348 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3350 sf2 = _get_sharefile(mutable_si_2)
3351 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3352 sf2_size = os.stat(sf2.home).st_size
3354 # mutable_si_3 gets an extra lease
3355 sf3 = _get_sharefile(mutable_si_3)
3356 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3358 ss.setServiceParent(self.s)
3360 d = fireEventually()
3361 # examine the state right after the first bucket has been processed
3362 def _after_first_bucket(ignored):
3363 p = lc.get_progress()
3364 if not p["cycle-in-progress"]:
3365 d2 = fireEventually()
3366 d2.addCallback(_after_first_bucket)
3368 d.addCallback(_after_first_bucket)
3369 d.addCallback(lambda ign: self.render1(webstatus))
3370 def _check_html_in_cycle(html):
3371 s = remove_tags(html)
3372 # the first bucket encountered gets deleted, and its prefix
3373 # happens to be about 1/5th of the way through the ring, so the
3374 # predictor thinks we'll have 5 shares and that we'll delete them
3375 # all. This part of the test depends upon the SIs landing right
3376 # where they do now.
3377 self.failUnlessIn("The remainder of this cycle is expected to "
3378 "recover: 4 shares, 4 buckets", s)
3379 self.failUnlessIn("The whole cycle is expected to examine "
3380 "5 shares in 5 buckets and to recover: "
3381 "5 shares, 5 buckets", s)
3382 d.addCallback(_check_html_in_cycle)
3384 # wait for the crawler to finish the first cycle. Two shares should
3387 return bool(lc.get_state()["last-cycle-finished"] is not None)
3388 d.addCallback(lambda ign: self.poll(_wait))
3390 def _after_first_cycle(ignored):
3391 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3392 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3393 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3394 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3395 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3396 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3399 last = s["history"][0]
3401 self.failUnlessEqual(last["expiration-enabled"], True)
3402 self.failUnlessEqual(last["configured-expiration-mode"],
3403 ("age", 2000, None, ("mutable", "immutable")))
3404 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3406 rec = last["space-recovered"]
3407 self.failUnlessEqual(rec["examined-buckets"], 4)
3408 self.failUnlessEqual(rec["examined-shares"], 4)
3409 self.failUnlessEqual(rec["actual-buckets"], 2)
3410 self.failUnlessEqual(rec["original-buckets"], 2)
3411 self.failUnlessEqual(rec["configured-buckets"], 2)
3412 self.failUnlessEqual(rec["actual-shares"], 2)
3413 self.failUnlessEqual(rec["original-shares"], 2)
3414 self.failUnlessEqual(rec["configured-shares"], 2)
3415 size = sf0_size + sf2_size
3416 self.failUnlessEqual(rec["actual-sharebytes"], size)
3417 self.failUnlessEqual(rec["original-sharebytes"], size)
3418 self.failUnlessEqual(rec["configured-sharebytes"], size)
3419 # different platforms have different notions of "blocks used by
3420 # this file", so merely assert that it's a number
3421 self.failUnless(rec["actual-diskbytes"] >= 0,
3422 rec["actual-diskbytes"])
3423 self.failUnless(rec["original-diskbytes"] >= 0,
3424 rec["original-diskbytes"])
3425 self.failUnless(rec["configured-diskbytes"] >= 0,
3426 rec["configured-diskbytes"])
3427 d.addCallback(_after_first_cycle)
3428 d.addCallback(lambda ign: self.render1(webstatus))
3429 def _check_html(html):
3430 s = remove_tags(html)
3431 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3432 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3433 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3434 d.addCallback(_check_html)
3437 def test_expire_cutoff_date(self):
3438 basedir = "storage/LeaseCrawler/expire_cutoff_date"
3439 fileutil.make_dirs(basedir)
3440 # setting cutoff-date to 2000 seconds ago means that any lease which
3441 # is more than 2000s old will be expired.
3443 then = int(now - 2000)
3444 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3445 expiration_enabled=True,
3446 expiration_mode="cutoff-date",
3447 expiration_cutoff_date=then)
3448 # make it start sooner than usual.
3449 lc = ss.lease_checker
3451 lc.stop_after_first_bucket = True
3452 webstatus = StorageStatus(ss)
3454 # create a few shares, with some leases on them
3455 self.make_shares(ss)
3456 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3458 def count_shares(si):
3459 return len(list(ss._iter_share_files(si)))
3460 def _get_sharefile(si):
3461 return list(ss._iter_share_files(si))[0]
3462 def count_leases(si):
3463 return len(list(_get_sharefile(si).get_leases()))
3465 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3466 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3467 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3468 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3469 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3470 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3471 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3472 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3474 # artificially crank back the expiration time on the first lease of
3475 # each share, to make it look like was renewed 3000s ago. To achieve
3476 # this, we need to set the expiration time to now-3000+31days. This
3477 # will change when the lease format is improved to contain both
3478 # create/renew time and duration.
3479 new_expiration_time = now - 3000 + 31*24*60*60
3481 # Some shares have an extra lease which is set to expire at the
3482 # default time in 31 days from now (age=31days). We then run the
3483 # crawler, which will expire the first lease, making some shares get
3484 # deleted and others stay alive (with one remaining lease)
3486 sf0 = _get_sharefile(immutable_si_0)
3487 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3488 sf0_size = os.stat(sf0.home).st_size
3490 # immutable_si_1 gets an extra lease
3491 sf1 = _get_sharefile(immutable_si_1)
3492 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3494 sf2 = _get_sharefile(mutable_si_2)
3495 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3496 sf2_size = os.stat(sf2.home).st_size
3498 # mutable_si_3 gets an extra lease
3499 sf3 = _get_sharefile(mutable_si_3)
3500 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3502 ss.setServiceParent(self.s)
3504 d = fireEventually()
3505 # examine the state right after the first bucket has been processed
3506 def _after_first_bucket(ignored):
3507 p = lc.get_progress()
3508 if not p["cycle-in-progress"]:
3509 d2 = fireEventually()
3510 d2.addCallback(_after_first_bucket)
3512 d.addCallback(_after_first_bucket)
3513 d.addCallback(lambda ign: self.render1(webstatus))
3514 def _check_html_in_cycle(html):
3515 s = remove_tags(html)
3516 # the first bucket encountered gets deleted, and its prefix
3517 # happens to be about 1/5th of the way through the ring, so the
3518 # predictor thinks we'll have 5 shares and that we'll delete them
3519 # all. This part of the test depends upon the SIs landing right
3520 # where they do now.
3521 self.failUnlessIn("The remainder of this cycle is expected to "
3522 "recover: 4 shares, 4 buckets", s)
3523 self.failUnlessIn("The whole cycle is expected to examine "
3524 "5 shares in 5 buckets and to recover: "
3525 "5 shares, 5 buckets", s)
3526 d.addCallback(_check_html_in_cycle)
3528 # wait for the crawler to finish the first cycle. Two shares should
3531 return bool(lc.get_state()["last-cycle-finished"] is not None)
3532 d.addCallback(lambda ign: self.poll(_wait))
3534 def _after_first_cycle(ignored):
3535 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3536 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3537 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3538 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3539 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3540 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3543 last = s["history"][0]
3545 self.failUnlessEqual(last["expiration-enabled"], True)
3546 self.failUnlessEqual(last["configured-expiration-mode"],
3547 ("cutoff-date", None, then,
3548 ("mutable", "immutable")))
3549 self.failUnlessEqual(last["leases-per-share-histogram"],
3552 rec = last["space-recovered"]
3553 self.failUnlessEqual(rec["examined-buckets"], 4)
3554 self.failUnlessEqual(rec["examined-shares"], 4)
3555 self.failUnlessEqual(rec["actual-buckets"], 2)
3556 self.failUnlessEqual(rec["original-buckets"], 0)
3557 self.failUnlessEqual(rec["configured-buckets"], 2)
3558 self.failUnlessEqual(rec["actual-shares"], 2)
3559 self.failUnlessEqual(rec["original-shares"], 0)
3560 self.failUnlessEqual(rec["configured-shares"], 2)
3561 size = sf0_size + sf2_size
3562 self.failUnlessEqual(rec["actual-sharebytes"], size)
3563 self.failUnlessEqual(rec["original-sharebytes"], 0)
3564 self.failUnlessEqual(rec["configured-sharebytes"], size)
3565 # different platforms have different notions of "blocks used by
3566 # this file", so merely assert that it's a number
3567 self.failUnless(rec["actual-diskbytes"] >= 0,
3568 rec["actual-diskbytes"])
3569 self.failUnless(rec["original-diskbytes"] >= 0,
3570 rec["original-diskbytes"])
3571 self.failUnless(rec["configured-diskbytes"] >= 0,
3572 rec["configured-diskbytes"])
3573 d.addCallback(_after_first_cycle)
3574 d.addCallback(lambda ign: self.render1(webstatus))
3575 def _check_html(html):
3576 s = remove_tags(html)
3577 self.failUnlessIn("Expiration Enabled:"
3578 " expired leases will be removed", s)
3579 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3580 substr = "Leases created or last renewed before %s will be considered expired." % date
3581 self.failUnlessIn(substr, s)
3582 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3583 d.addCallback(_check_html)
3586 def test_only_immutable(self):
3587 basedir = "storage/LeaseCrawler/only_immutable"
3588 fileutil.make_dirs(basedir)
3590 then = int(now - 2000)
3591 ss = StorageServer(basedir, "\x00" * 20,
3592 expiration_enabled=True,
3593 expiration_mode="cutoff-date",
3594 expiration_cutoff_date=then,
3595 expiration_sharetypes=("immutable",))
3596 lc = ss.lease_checker
3598 webstatus = StorageStatus(ss)
3600 self.make_shares(ss)
3601 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3602 # set all leases to be expirable
3603 new_expiration_time = now - 3000 + 31*24*60*60
3605 def count_shares(si):
3606 return len(list(ss._iter_share_files(si)))
3607 def _get_sharefile(si):
3608 return list(ss._iter_share_files(si))[0]
3609 def count_leases(si):
3610 return len(list(_get_sharefile(si).get_leases()))
3612 sf0 = _get_sharefile(immutable_si_0)
3613 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3614 sf1 = _get_sharefile(immutable_si_1)
3615 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3616 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3617 sf2 = _get_sharefile(mutable_si_2)
3618 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3619 sf3 = _get_sharefile(mutable_si_3)
3620 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3621 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3623 ss.setServiceParent(self.s)
3625 return bool(lc.get_state()["last-cycle-finished"] is not None)
3626 d = self.poll(_wait)
3628 def _after_first_cycle(ignored):
3629 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3630 self.failUnlessEqual(count_shares(immutable_si_1), 0)
3631 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3632 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3633 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3634 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3635 d.addCallback(_after_first_cycle)
3636 d.addCallback(lambda ign: self.render1(webstatus))
3637 def _check_html(html):
3638 s = remove_tags(html)
3639 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3640 d.addCallback(_check_html)
3643 def test_only_mutable(self):
3644 basedir = "storage/LeaseCrawler/only_mutable"
3645 fileutil.make_dirs(basedir)
3647 then = int(now - 2000)
3648 ss = StorageServer(basedir, "\x00" * 20,
3649 expiration_enabled=True,
3650 expiration_mode="cutoff-date",
3651 expiration_cutoff_date=then,
3652 expiration_sharetypes=("mutable",))
3653 lc = ss.lease_checker
3655 webstatus = StorageStatus(ss)
3657 self.make_shares(ss)
3658 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3659 # set all leases to be expirable
3660 new_expiration_time = now - 3000 + 31*24*60*60
3662 def count_shares(si):
3663 return len(list(ss._iter_share_files(si)))
3664 def _get_sharefile(si):
3665 return list(ss._iter_share_files(si))[0]
3666 def count_leases(si):
3667 return len(list(_get_sharefile(si).get_leases()))
3669 sf0 = _get_sharefile(immutable_si_0)
3670 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3671 sf1 = _get_sharefile(immutable_si_1)
3672 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3673 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3674 sf2 = _get_sharefile(mutable_si_2)
3675 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3676 sf3 = _get_sharefile(mutable_si_3)
3677 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3678 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3680 ss.setServiceParent(self.s)
3682 return bool(lc.get_state()["last-cycle-finished"] is not None)
3683 d = self.poll(_wait)
3685 def _after_first_cycle(ignored):
3686 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3687 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3688 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3689 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3690 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3691 self.failUnlessEqual(count_shares(mutable_si_3), 0)
3692 d.addCallback(_after_first_cycle)
3693 d.addCallback(lambda ign: self.render1(webstatus))
3694 def _check_html(html):
3695 s = remove_tags(html)
3696 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3697 d.addCallback(_check_html)
3700 def test_bad_mode(self):
3701 basedir = "storage/LeaseCrawler/bad_mode"
3702 fileutil.make_dirs(basedir)
3703 e = self.failUnlessRaises(ValueError,
3704 StorageServer, basedir, "\x00" * 20,
3705 expiration_mode="bogus")
3706 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3708 def test_parse_duration(self):
3712 p = time_format.parse_duration
3713 self.failUnlessEqual(p("7days"), 7*DAY)
3714 self.failUnlessEqual(p("31day"), 31*DAY)
3715 self.failUnlessEqual(p("60 days"), 60*DAY)
3716 self.failUnlessEqual(p("2mo"), 2*MONTH)
3717 self.failUnlessEqual(p("3 month"), 3*MONTH)
3718 self.failUnlessEqual(p("2years"), 2*YEAR)
3719 e = self.failUnlessRaises(ValueError, p, "2kumquats")
3720 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3722 def test_parse_date(self):
3723 p = time_format.parse_date
3724 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3725 self.failUnlessEqual(p("2009-03-18"), 1237334400)
3727 def test_limited_history(self):
3728 basedir = "storage/LeaseCrawler/limited_history"
3729 fileutil.make_dirs(basedir)
3730 ss = StorageServer(basedir, "\x00" * 20)
3731 # make it start sooner than usual.
3732 lc = ss.lease_checker
3736 # create a few shares, with some leases on them
3737 self.make_shares(ss)
3739 ss.setServiceParent(self.s)
3741 def _wait_until_15_cycles_done():
3742 last = lc.state["last-cycle-finished"]
3743 if last is not None and last >= 15:
3748 d = self.poll(_wait_until_15_cycles_done)
3750 def _check(ignored):
3753 self.failUnlessEqual(len(h), 10)
3754 self.failUnlessEqual(max(h.keys()), 15)
3755 self.failUnlessEqual(min(h.keys()), 6)
3756 d.addCallback(_check)
3759 def test_unpredictable_future(self):
3760 basedir = "storage/LeaseCrawler/unpredictable_future"
3761 fileutil.make_dirs(basedir)
3762 ss = StorageServer(basedir, "\x00" * 20)
3763 # make it start sooner than usual.
3764 lc = ss.lease_checker
3766 lc.cpu_slice = -1.0 # stop quickly
3768 self.make_shares(ss)
3770 ss.setServiceParent(self.s)
3772 d = fireEventually()
3773 def _check(ignored):
3774 # this should fire after the first bucket is complete, but before
3775 # the first prefix is complete, so the progress-measurer won't
3776 # think we've gotten far enough to raise our percent-complete
3777 # above 0%, triggering the cannot-predict-the-future code in
3778 # expirer.py . This will have to change if/when the
3779 # progress-measurer gets smart enough to count buckets (we'll
3780 # have to interrupt it even earlier, before it's finished the
3783 if "cycle-to-date" not in s:
3784 d2 = fireEventually()
3785 d2.addCallback(_check)
3787 self.failUnlessIn("cycle-to-date", s)
3788 self.failUnlessIn("estimated-remaining-cycle", s)
3789 self.failUnlessIn("estimated-current-cycle", s)
3791 left = s["estimated-remaining-cycle"]["space-recovered"]
3792 self.failUnlessEqual(left["actual-buckets"], None)
3793 self.failUnlessEqual(left["original-buckets"], None)
3794 self.failUnlessEqual(left["configured-buckets"], None)
3795 self.failUnlessEqual(left["actual-shares"], None)
3796 self.failUnlessEqual(left["original-shares"], None)
3797 self.failUnlessEqual(left["configured-shares"], None)
3798 self.failUnlessEqual(left["actual-diskbytes"], None)
3799 self.failUnlessEqual(left["original-diskbytes"], None)
3800 self.failUnlessEqual(left["configured-diskbytes"], None)
3801 self.failUnlessEqual(left["actual-sharebytes"], None)
3802 self.failUnlessEqual(left["original-sharebytes"], None)
3803 self.failUnlessEqual(left["configured-sharebytes"], None)
3805 full = s["estimated-remaining-cycle"]["space-recovered"]
3806 self.failUnlessEqual(full["actual-buckets"], None)
3807 self.failUnlessEqual(full["original-buckets"], None)
3808 self.failUnlessEqual(full["configured-buckets"], None)
3809 self.failUnlessEqual(full["actual-shares"], None)
3810 self.failUnlessEqual(full["original-shares"], None)
3811 self.failUnlessEqual(full["configured-shares"], None)
3812 self.failUnlessEqual(full["actual-diskbytes"], None)
3813 self.failUnlessEqual(full["original-diskbytes"], None)
3814 self.failUnlessEqual(full["configured-diskbytes"], None)
3815 self.failUnlessEqual(full["actual-sharebytes"], None)
3816 self.failUnlessEqual(full["original-sharebytes"], None)
3817 self.failUnlessEqual(full["configured-sharebytes"], None)
3819 d.addCallback(_check)
3822 def test_no_st_blocks(self):
3823 basedir = "storage/LeaseCrawler/no_st_blocks"
3824 fileutil.make_dirs(basedir)
3825 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3826 expiration_mode="age",
3827 expiration_override_lease_duration=-1000)
3828 # a negative expiration_time= means the "configured-"
3829 # space-recovered counts will be non-zero, since all shares will have
3832 # make it start sooner than usual.
3833 lc = ss.lease_checker
3836 self.make_shares(ss)
3837 ss.setServiceParent(self.s)
3839 return bool(lc.get_state()["last-cycle-finished"] is not None)
3840 d = self.poll(_wait)
3842 def _check(ignored):
3844 last = s["history"][0]
3845 rec = last["space-recovered"]
3846 self.failUnlessEqual(rec["configured-buckets"], 4)
3847 self.failUnlessEqual(rec["configured-shares"], 4)
3848 self.failUnless(rec["configured-sharebytes"] > 0,
3849 rec["configured-sharebytes"])
3850 # without the .st_blocks field in os.stat() results, we should be
3851 # reporting diskbytes==sharebytes
3852 self.failUnlessEqual(rec["configured-sharebytes"],
3853 rec["configured-diskbytes"])
3854 d.addCallback(_check)
3857 def test_share_corruption(self):
3858 self._poll_should_ignore_these_errors = [
3859 UnknownMutableContainerVersionError,
3860 UnknownImmutableContainerVersionError,
3862 basedir = "storage/LeaseCrawler/share_corruption"
3863 fileutil.make_dirs(basedir)
3864 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3865 w = StorageStatus(ss)
3866 # make it start sooner than usual.
3867 lc = ss.lease_checker
3868 lc.stop_after_first_bucket = True
3872 # create a few shares, with some leases on them
3873 self.make_shares(ss)
3875 # now corrupt one, and make sure the lease-checker keeps going
3876 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3877 first = min(self.sis)
3878 first_b32 = base32.b2a(first)
3879 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3882 f.write("BAD MAGIC")
3884 # if get_share_file() doesn't see the correct mutable magic, it
3885 # assumes the file is an immutable share, and then
3886 # immutable.ShareFile sees a bad version. So regardless of which kind
3887 # of share we corrupted, this will trigger an
3888 # UnknownImmutableContainerVersionError.
3890 # also create an empty bucket
3891 empty_si = base32.b2a("\x04"*16)
3892 empty_bucket_dir = os.path.join(ss.sharedir,
3893 storage_index_to_dir(empty_si))
3894 fileutil.make_dirs(empty_bucket_dir)
3896 ss.setServiceParent(self.s)
3898 d = fireEventually()
3900 # now examine the state right after the first bucket has been
3902 def _after_first_bucket(ignored):
3904 if "cycle-to-date" not in s:
3905 d2 = fireEventually()
3906 d2.addCallback(_after_first_bucket)
3908 so_far = s["cycle-to-date"]
3909 rec = so_far["space-recovered"]
3910 self.failUnlessEqual(rec["examined-buckets"], 1)
3911 self.failUnlessEqual(rec["examined-shares"], 0)
3912 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3913 d.addCallback(_after_first_bucket)
3915 d.addCallback(lambda ign: self.render_json(w))
3916 def _check_json(json):
3917 data = simplejson.loads(json)
3918 # grr. json turns all dict keys into strings.
3919 so_far = data["lease-checker"]["cycle-to-date"]
3920 corrupt_shares = so_far["corrupt-shares"]
3921 # it also turns all tuples into lists
3922 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3923 d.addCallback(_check_json)
3924 d.addCallback(lambda ign: self.render1(w))
3925 def _check_html(html):
3926 s = remove_tags(html)
3927 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3928 d.addCallback(_check_html)
3931 return bool(lc.get_state()["last-cycle-finished"] is not None)
3932 d.addCallback(lambda ign: self.poll(_wait))
3934 def _after_first_cycle(ignored):
3936 last = s["history"][0]
3937 rec = last["space-recovered"]
3938 self.failUnlessEqual(rec["examined-buckets"], 5)
3939 self.failUnlessEqual(rec["examined-shares"], 3)
3940 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3941 d.addCallback(_after_first_cycle)
3942 d.addCallback(lambda ign: self.render_json(w))
3943 def _check_json_history(json):
3944 data = simplejson.loads(json)
3945 last = data["lease-checker"]["history"]["0"]
3946 corrupt_shares = last["corrupt-shares"]
3947 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3948 d.addCallback(_check_json_history)
3949 d.addCallback(lambda ign: self.render1(w))
3950 def _check_html_history(html):
3951 s = remove_tags(html)
3952 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3953 d.addCallback(_check_html_history)
3956 self.flushLoggedErrors(UnknownMutableContainerVersionError,
3957 UnknownImmutableContainerVersionError)
3962 def render_json(self, page):
3963 d = self.render1(page, args={"t": ["json"]})
3966 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3969 self.s = service.MultiService()
3970 self.s.startService()
3972 return self.s.stopService()
3974 def test_no_server(self):
3975 w = StorageStatus(None)
3976 html = w.renderSynchronously()
3977 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3979 def test_status(self):
3980 basedir = "storage/WebStatus/status"
3981 fileutil.make_dirs(basedir)
3982 nodeid = "\x00" * 20
3983 ss = StorageServer(basedir, nodeid)
3984 ss.setServiceParent(self.s)
3985 w = StorageStatus(ss, "nickname")
3987 def _check_html(html):
3988 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3989 s = remove_tags(html)
3990 self.failUnlessIn("Server Nickname: nickname", s)
3991 self.failUnlessIn("Server Nodeid: %s" % base32.b2a(nodeid), s)
3992 self.failUnlessIn("Accepting new shares: Yes", s)
3993 self.failUnlessIn("Reserved space: - 0 B (0)", s)
3994 d.addCallback(_check_html)
3995 d.addCallback(lambda ign: self.render_json(w))
3996 def _check_json(json):
3997 data = simplejson.loads(json)
3999 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
4000 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
4001 self.failUnlessIn("bucket-counter", data)
4002 self.failUnlessIn("lease-checker", data)
4003 d.addCallback(_check_json)
4006 def render_json(self, page):
4007 d = self.render1(page, args={"t": ["json"]})
4010 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4011 def test_status_no_disk_stats(self, mock_get_disk_stats):
4012 mock_get_disk_stats.side_effect = AttributeError()
4014 # Some platforms may have no disk stats API. Make sure the code can handle that
4015 # (test runs on all platforms).
4016 basedir = "storage/WebStatus/status_no_disk_stats"
4017 fileutil.make_dirs(basedir)
4018 ss = StorageServer(basedir, "\x00" * 20)
4019 ss.setServiceParent(self.s)
4020 w = StorageStatus(ss)
4021 html = w.renderSynchronously()
4022 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4023 s = remove_tags(html)
4024 self.failUnlessIn("Accepting new shares: Yes", s)
4025 self.failUnlessIn("Total disk space: ?", s)
4026 self.failUnlessIn("Space Available to Tahoe: ?", s)
4027 self.failUnless(ss.get_available_space() is None)
4029 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4030 def test_status_bad_disk_stats(self, mock_get_disk_stats):
4031 mock_get_disk_stats.side_effect = OSError()
4033 # If the API to get disk stats exists but a call to it fails, then the status should
4034 # show that no shares will be accepted, and get_available_space() should be 0.
4035 basedir = "storage/WebStatus/status_bad_disk_stats"
4036 fileutil.make_dirs(basedir)
4037 ss = StorageServer(basedir, "\x00" * 20)
4038 ss.setServiceParent(self.s)
4039 w = StorageStatus(ss)
4040 html = w.renderSynchronously()
4041 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4042 s = remove_tags(html)
4043 self.failUnlessIn("Accepting new shares: No", s)
4044 self.failUnlessIn("Total disk space: ?", s)
4045 self.failUnlessIn("Space Available to Tahoe: ?", s)
4046 self.failUnlessEqual(ss.get_available_space(), 0)
4048 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4049 def test_status_right_disk_stats(self, mock_get_disk_stats):
4052 free_for_root = 4*GB
4053 free_for_nonroot = 3*GB
4054 reserved_space = 1*GB
4055 used = total - free_for_root
4056 avail = max(free_for_nonroot - reserved_space, 0)
4057 mock_get_disk_stats.return_value = {
4059 'free_for_root': free_for_root,
4060 'free_for_nonroot': free_for_nonroot,
4065 basedir = "storage/WebStatus/status_right_disk_stats"
4066 fileutil.make_dirs(basedir)
4067 ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4068 expecteddir = ss.sharedir
4069 ss.setServiceParent(self.s)
4070 w = StorageStatus(ss)
4071 html = w.renderSynchronously()
4073 self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4074 mock_get_disk_stats.call_args_list)
4076 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4077 s = remove_tags(html)
4078 self.failUnlessIn("Total disk space: 5.00 GB", s)
4079 self.failUnlessIn("Disk space used: - 1.00 GB", s)
4080 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4081 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4082 self.failUnlessIn("Reserved space: - 1.00 GB", s)
4083 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4084 self.failUnlessEqual(ss.get_available_space(), 2*GB)
4086 def test_readonly(self):
4087 basedir = "storage/WebStatus/readonly"
4088 fileutil.make_dirs(basedir)
4089 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4090 ss.setServiceParent(self.s)
4091 w = StorageStatus(ss)
4092 html = w.renderSynchronously()
4093 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4094 s = remove_tags(html)
4095 self.failUnlessIn("Accepting new shares: No", s)
4097 def test_reserved(self):
4098 basedir = "storage/WebStatus/reserved"
4099 fileutil.make_dirs(basedir)
4100 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4101 ss.setServiceParent(self.s)
4102 w = StorageStatus(ss)
4103 html = w.renderSynchronously()
4104 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4105 s = remove_tags(html)
4106 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4108 def test_huge_reserved(self):
4109 basedir = "storage/WebStatus/reserved"
4110 fileutil.make_dirs(basedir)
4111 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4112 ss.setServiceParent(self.s)
4113 w = StorageStatus(ss)
4114 html = w.renderSynchronously()
4115 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4116 s = remove_tags(html)
4117 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4119 def test_util(self):
4120 w = StorageStatus(None)
4121 self.failUnlessEqual(w.render_space(None, None), "?")
4122 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4123 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4124 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4125 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4126 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)