1 import time, os.path, platform, stat, re, simplejson, struct, shutil
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24 LayoutInvalid, MDMFSIGNABLEHEADER, \
25 SIGNED_PREFIX, MDMFHEADER, \
26 MDMFOFFSETS, SDMFSlotWriteProxy, \
29 VERIFICATION_KEY_SIZE, \
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
40 def __init__(self, ignore_disconnectors=False):
41 self.ignore = ignore_disconnectors
42 self.disconnectors = {}
43 def notifyOnDisconnect(self, f, *args, **kwargs):
47 self.disconnectors[m] = (f, args, kwargs)
49 def dontNotifyOnDisconnect(self, marker):
52 del self.disconnectors[marker]
54 class FakeStatsProvider:
55 def count(self, name, delta=1):
57 def register_producer(self, producer):
60 class Bucket(unittest.TestCase):
61 def make_workdir(self, name):
62 basedir = os.path.join("storage", "Bucket", name)
63 incoming = os.path.join(basedir, "tmp", "bucket")
64 final = os.path.join(basedir, "bucket")
65 fileutil.make_dirs(basedir)
66 fileutil.make_dirs(os.path.join(basedir, "tmp"))
67 return incoming, final
69 def bucket_writer_closed(self, bw, consumed):
71 def add_latency(self, category, latency):
73 def count(self, name, delta=1):
78 renew_secret = os.urandom(32)
79 cancel_secret = os.urandom(32)
80 expiration_time = time.time() + 5000
81 return LeaseInfo(owner_num, renew_secret, cancel_secret,
82 expiration_time, "\x00" * 20)
84 def test_create(self):
85 incoming, final = self.make_workdir("test_create")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*25)
91 bw.remote_write(75, "d"*7)
94 def test_readwrite(self):
95 incoming, final = self.make_workdir("test_readwrite")
96 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
98 bw.remote_write(0, "a"*25)
99 bw.remote_write(25, "b"*25)
100 bw.remote_write(50, "c"*7) # last block may be short
104 br = BucketReader(self, bw.finalhome)
105 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
109 def test_read_past_end_of_share_data(self):
110 # test vector for immutable files (hard-coded contents of an immutable share
113 # The following immutable share file content is identical to that
114 # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115 # with share data == 'a'. The total size of this content is 85
118 containerdata = struct.pack('>LLL', 1, 1, 1)
120 # A Tahoe-LAFS storage client would send as the share_data a
121 # complicated string involving hash trees and a URI Extension Block
122 # -- see allmydata/immutable/layout.py . This test, which is
123 # simulating a client, just sends 'a'.
126 ownernumber = struct.pack('>L', 0)
127 renewsecret = 'THIS LETS ME RENEW YOUR FILE....'
128 assert len(renewsecret) == 32
129 cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130 assert len(cancelsecret) == 32
131 expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
133 lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
135 share_file_data = containerdata + share_data + lease_data
137 incoming, final = self.make_workdir("test_read_past_end_of_share_data")
139 fileutil.write(final, share_file_data)
141 mockstorageserver = mock.Mock()
144 br = BucketReader(mockstorageserver, final)
146 self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
148 # Read past the end of share data to get the cancel secret.
149 read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
151 result_of_read = br.remote_read(0, read_length)
152 self.failUnlessEqual(result_of_read, share_data)
154 result_of_read = br.remote_read(0, len(share_data)+1)
155 self.failUnlessEqual(result_of_read, share_data)
163 def callRemote(self, methname, *args, **kwargs):
165 meth = getattr(self.target, "remote_" + methname)
166 return meth(*args, **kwargs)
168 if methname == "slot_readv":
170 if "writev" in methname:
171 self.write_count += 1
173 return defer.maybeDeferred(_call)
176 class BucketProxy(unittest.TestCase):
177 def make_bucket(self, name, size):
178 basedir = os.path.join("storage", "BucketProxy", name)
179 incoming = os.path.join(basedir, "tmp", "bucket")
180 final = os.path.join(basedir, "bucket")
181 fileutil.make_dirs(basedir)
182 fileutil.make_dirs(os.path.join(basedir, "tmp"))
183 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
189 def make_lease(self):
191 renew_secret = os.urandom(32)
192 cancel_secret = os.urandom(32)
193 expiration_time = time.time() + 5000
194 return LeaseInfo(owner_num, renew_secret, cancel_secret,
195 expiration_time, "\x00" * 20)
197 def bucket_writer_closed(self, bw, consumed):
199 def add_latency(self, category, latency):
201 def count(self, name, delta=1):
204 def test_create(self):
205 bw, rb, sharefname = self.make_bucket("test_create", 500)
206 bp = WriteBucketProxy(rb, None,
211 uri_extension_size_max=500)
212 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
214 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215 # Let's pretend each share has 100 bytes of data, and that there are
216 # 4 segments (25 bytes each), and 8 shares total. So the two
217 # per-segment merkle trees (crypttext_hash_tree,
218 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221 # long. That should make the whole share:
223 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
226 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
228 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
230 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
232 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
234 uri_extension = "s" + "E"*498 + "e"
236 bw, rb, sharefname = self.make_bucket(name, sharesize)
237 bp = wbp_class(rb, None,
242 uri_extension_size_max=len(uri_extension))
245 d.addCallback(lambda res: bp.put_block(0, "a"*25))
246 d.addCallback(lambda res: bp.put_block(1, "b"*25))
247 d.addCallback(lambda res: bp.put_block(2, "c"*25))
248 d.addCallback(lambda res: bp.put_block(3, "d"*20))
249 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253 d.addCallback(lambda res: bp.close())
255 # now read everything back
256 def _start_reading(res):
257 br = BucketReader(self, sharefname)
260 server = NoNetworkServer("abc", None)
261 rbp = rbp_class(rb, server, storage_index="")
262 self.failUnlessIn("to peer", repr(rbp))
263 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
265 d1 = rbp.get_block_data(0, 25, 25)
266 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
274 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275 d1.addCallback(lambda res:
276 self.failUnlessEqual(res, crypttext_hashes))
277 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279 d1.addCallback(lambda res: rbp.get_share_hashes())
280 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281 d1.addCallback(lambda res: rbp.get_uri_extension())
282 d1.addCallback(lambda res:
283 self.failUnlessEqual(res, uri_extension))
287 d.addCallback(_start_reading)
291 def test_readwrite_v1(self):
292 return self._do_test_readwrite("test_readwrite_v1",
293 0x24, WriteBucketProxy, ReadBucketProxy)
295 def test_readwrite_v2(self):
296 return self._do_test_readwrite("test_readwrite_v2",
297 0x44, WriteBucketProxy_v2, ReadBucketProxy)
299 class Server(unittest.TestCase):
302 self.sparent = LoggingServiceParent()
303 self.sparent.startService()
304 self._lease_secret = itertools.count()
306 return self.sparent.stopService()
308 def workdir(self, name):
309 basedir = os.path.join("storage", "Server", name)
312 def create(self, name, reserved_space=0, klass=StorageServer):
313 workdir = self.workdir(name)
314 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315 stats_provider=FakeStatsProvider())
316 ss.setServiceParent(self.sparent)
319 def test_create(self):
320 self.create("test_create")
322 def test_declares_fixed_1528(self):
323 ss = self.create("test_declares_fixed_1528")
324 ver = ss.remote_get_version()
325 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326 self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
328 def allocate(self, ss, storage_index, sharenums, size, canary=None):
329 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
330 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
332 canary = FakeCanary()
333 return ss.remote_allocate_buckets(storage_index,
334 renew_secret, cancel_secret,
335 sharenums, size, canary)
337 def test_large_share(self):
338 syslow = platform.system().lower()
339 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
340 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
342 avail = fileutil.get_available_space('.', 512*2**20)
344 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
346 ss = self.create("test_large_share")
348 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
349 self.failUnlessEqual(already, set())
350 self.failUnlessEqual(set(writers.keys()), set([0]))
352 shnum, bucket = writers.items()[0]
353 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
354 bucket.remote_write(2**32, "ab")
355 bucket.remote_close()
357 readers = ss.remote_get_buckets("allocate")
358 reader = readers[shnum]
359 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
361 def test_dont_overfill_dirs(self):
363 This test asserts that if you add a second share whose storage index
364 share lots of leading bits with an extant share (but isn't the exact
365 same storage index), this won't add an entry to the share directory.
367 ss = self.create("test_dont_overfill_dirs")
368 already, writers = self.allocate(ss, "storageindex", [0], 10)
369 for i, wb in writers.items():
370 wb.remote_write(0, "%10d" % i)
372 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
374 children_of_storedir = set(os.listdir(storedir))
376 # Now store another one under another storageindex that has leading
377 # chars the same as the first storageindex.
378 already, writers = self.allocate(ss, "storageindey", [0], 10)
379 for i, wb in writers.items():
380 wb.remote_write(0, "%10d" % i)
382 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
384 new_children_of_storedir = set(os.listdir(storedir))
385 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
387 def test_remove_incoming(self):
388 ss = self.create("test_remove_incoming")
389 already, writers = self.allocate(ss, "vid", range(3), 10)
390 for i,wb in writers.items():
391 wb.remote_write(0, "%10d" % i)
393 incoming_share_dir = wb.incominghome
394 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
395 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
396 incoming_dir = os.path.dirname(incoming_prefix_dir)
397 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
398 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
399 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
401 def test_abort(self):
402 # remote_abort, when called on a writer, should make sure that
403 # the allocated size of the bucket is not counted by the storage
404 # server when accounting for space.
405 ss = self.create("test_abort")
406 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
407 self.failIfEqual(ss.allocated_size(), 0)
409 # Now abort the writers.
410 for writer in writers.itervalues():
411 writer.remote_abort()
412 self.failUnlessEqual(ss.allocated_size(), 0)
415 def test_allocate(self):
416 ss = self.create("test_allocate")
418 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
420 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
421 self.failUnlessEqual(already, set())
422 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
424 # while the buckets are open, they should not count as readable
425 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
428 for i,wb in writers.items():
429 wb.remote_write(0, "%25d" % i)
431 # aborting a bucket that was already closed is a no-op
434 # now they should be readable
435 b = ss.remote_get_buckets("allocate")
436 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
437 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
439 self.failUnlessIn("BucketReader", b_str)
440 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
442 # now if we ask about writing again, the server should offer those
443 # three buckets as already present. It should offer them even if we
444 # don't ask about those specific ones.
445 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
446 self.failUnlessEqual(already, set([0,1,2]))
447 self.failUnlessEqual(set(writers.keys()), set([3,4]))
449 # while those two buckets are open for writing, the server should
450 # refuse to offer them to uploaders
452 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
453 self.failUnlessEqual(already2, set([0,1,2]))
454 self.failUnlessEqual(set(writers2.keys()), set([5]))
456 # aborting the writes should remove the tempfiles
457 for i,wb in writers2.items():
459 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
460 self.failUnlessEqual(already2, set([0,1,2]))
461 self.failUnlessEqual(set(writers2.keys()), set([5]))
463 for i,wb in writers2.items():
465 for i,wb in writers.items():
468 def test_bad_container_version(self):
469 ss = self.create("test_bad_container_version")
470 a,w = self.allocate(ss, "si1", [0], 10)
471 w[0].remote_write(0, "\xff"*10)
474 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
477 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
480 ss.remote_get_buckets("allocate")
482 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
483 ss.remote_get_buckets, "si1")
484 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
486 def test_disconnect(self):
487 # simulate a disconnection
488 ss = self.create("test_disconnect")
489 canary = FakeCanary()
490 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
491 self.failUnlessEqual(already, set())
492 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
493 for (f,args,kwargs) in canary.disconnectors.values():
498 # that ought to delete the incoming shares
499 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
500 self.failUnlessEqual(already, set())
501 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
503 @mock.patch('allmydata.util.fileutil.get_disk_stats')
504 def test_reserved_space(self, mock_get_disk_stats):
506 mock_get_disk_stats.return_value = {
507 'free_for_nonroot': 15000,
508 'avail': max(15000 - reserved_space, 0),
511 ss = self.create("test_reserved_space", reserved_space=reserved_space)
512 # 15k available, 10k reserved, leaves 5k for shares
514 # a newly created and filled share incurs this much overhead, beyond
515 # the size we request.
517 LEASE_SIZE = 4+32+32+4
518 canary = FakeCanary(True)
519 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
520 self.failUnlessEqual(len(writers), 3)
521 # now the StorageServer should have 3000 bytes provisionally
522 # allocated, allowing only 2000 more to be claimed
523 self.failUnlessEqual(len(ss._active_writers), 3)
525 # allocating 1001-byte shares only leaves room for one
526 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
527 self.failUnlessEqual(len(writers2), 1)
528 self.failUnlessEqual(len(ss._active_writers), 4)
530 # we abandon the first set, so their provisional allocation should be
534 self.failUnlessEqual(len(ss._active_writers), 1)
535 # now we have a provisional allocation of 1001 bytes
537 # and we close the second set, so their provisional allocation should
538 # become real, long-term allocation, and grows to include the
540 for bw in writers2.values():
541 bw.remote_write(0, "a"*25)
546 self.failUnlessEqual(len(ss._active_writers), 0)
548 allocated = 1001 + OVERHEAD + LEASE_SIZE
550 # we have to manually increase available, since we're not doing real
552 mock_get_disk_stats.return_value = {
553 'free_for_nonroot': 15000 - allocated,
554 'avail': max(15000 - allocated - reserved_space, 0),
557 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
558 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
559 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
560 self.failUnlessEqual(len(writers3), 39)
561 self.failUnlessEqual(len(ss._active_writers), 39)
565 self.failUnlessEqual(len(ss._active_writers), 0)
566 ss.disownServiceParent()
570 basedir = self.workdir("test_seek_behavior")
571 fileutil.make_dirs(basedir)
572 filename = os.path.join(basedir, "testfile")
573 f = open(filename, "wb")
576 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
577 # files. mode="a" preserves previous contents but does not allow
578 # seeking-to-create-holes. mode="r+" allows both.
579 f = open(filename, "rb+")
583 filelen = os.stat(filename)[stat.ST_SIZE]
584 self.failUnlessEqual(filelen, 100+3)
585 f2 = open(filename, "rb")
586 self.failUnlessEqual(f2.read(5), "start")
589 def test_leases(self):
590 ss = self.create("test_leases")
591 canary = FakeCanary()
595 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
596 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
597 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
598 sharenums, size, canary)
599 self.failUnlessEqual(len(already), 0)
600 self.failUnlessEqual(len(writers), 5)
601 for wb in writers.values():
604 leases = list(ss.get_leases("si0"))
605 self.failUnlessEqual(len(leases), 1)
606 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
608 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
611 sharenums, size, canary)
612 for wb in writers.values():
615 # take out a second lease on si1
616 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
617 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
618 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
619 sharenums, size, canary)
620 self.failUnlessEqual(len(already), 5)
621 self.failUnlessEqual(len(writers), 0)
623 leases = list(ss.get_leases("si1"))
624 self.failUnlessEqual(len(leases), 2)
625 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
627 # and a third lease, using add-lease
628 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
629 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
630 ss.remote_add_lease("si1", rs2a, cs2a)
631 leases = list(ss.get_leases("si1"))
632 self.failUnlessEqual(len(leases), 3)
633 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
635 # add-lease on a missing storage index is silently ignored
636 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
638 # check that si0 is readable
639 readers = ss.remote_get_buckets("si0")
640 self.failUnlessEqual(len(readers), 5)
642 # renew the first lease. Only the proper renew_secret should work
643 ss.remote_renew_lease("si0", rs0)
644 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
645 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
647 # check that si0 is still readable
648 readers = ss.remote_get_buckets("si0")
649 self.failUnlessEqual(len(readers), 5)
652 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
653 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
654 ss.remote_cancel_lease("si0", cs0)
656 # si0 should now be gone
657 readers = ss.remote_get_buckets("si0")
658 self.failUnlessEqual(len(readers), 0)
659 # and the renew should no longer work
660 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
663 # cancel the first lease on si1, leaving the second and third in place
664 ss.remote_cancel_lease("si1", cs1)
665 readers = ss.remote_get_buckets("si1")
666 self.failUnlessEqual(len(readers), 5)
667 # the corresponding renew should no longer work
668 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
670 leases = list(ss.get_leases("si1"))
671 self.failUnlessEqual(len(leases), 2)
672 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
674 ss.remote_renew_lease("si1", rs2)
675 # cancelling the second and third should make it go away
676 ss.remote_cancel_lease("si1", cs2)
677 ss.remote_cancel_lease("si1", cs2a)
678 readers = ss.remote_get_buckets("si1")
679 self.failUnlessEqual(len(readers), 0)
680 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
681 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
682 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
684 leases = list(ss.get_leases("si1"))
685 self.failUnlessEqual(len(leases), 0)
688 # test overlapping uploads
689 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
690 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
691 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
692 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
693 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
694 sharenums, size, canary)
695 self.failUnlessEqual(len(already), 0)
696 self.failUnlessEqual(len(writers), 5)
697 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
698 sharenums, size, canary)
699 self.failUnlessEqual(len(already2), 0)
700 self.failUnlessEqual(len(writers2), 0)
701 for wb in writers.values():
704 leases = list(ss.get_leases("si3"))
705 self.failUnlessEqual(len(leases), 1)
707 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
708 sharenums, size, canary)
709 self.failUnlessEqual(len(already3), 5)
710 self.failUnlessEqual(len(writers3), 0)
712 leases = list(ss.get_leases("si3"))
713 self.failUnlessEqual(len(leases), 2)
715 def test_readonly(self):
716 workdir = self.workdir("test_readonly")
717 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
718 ss.setServiceParent(self.sparent)
720 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
721 self.failUnlessEqual(already, set())
722 self.failUnlessEqual(writers, {})
724 stats = ss.get_stats()
725 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
726 if "storage_server.disk_avail" in stats:
727 # Some platforms may not have an API to get disk stats.
728 # But if there are stats, readonly_storage means disk_avail=0
729 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
731 def test_discard(self):
732 # discard is really only used for other tests, but we test it anyways
733 workdir = self.workdir("test_discard")
734 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
735 ss.setServiceParent(self.sparent)
737 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
738 self.failUnlessEqual(already, set())
739 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
740 for i,wb in writers.items():
741 wb.remote_write(0, "%25d" % i)
743 # since we discard the data, the shares should be present but sparse.
744 # Since we write with some seeks, the data we read back will be all
746 b = ss.remote_get_buckets("vid")
747 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
748 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
750 def test_advise_corruption(self):
751 workdir = self.workdir("test_advise_corruption")
752 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
753 ss.setServiceParent(self.sparent)
755 si0_s = base32.b2a("si0")
756 ss.remote_advise_corrupt_share("immutable", "si0", 0,
757 "This share smells funny.\n")
758 reportdir = os.path.join(workdir, "corruption-advisories")
759 reports = os.listdir(reportdir)
760 self.failUnlessEqual(len(reports), 1)
761 report_si0 = reports[0]
762 self.failUnlessIn(si0_s, report_si0)
763 f = open(os.path.join(reportdir, report_si0), "r")
766 self.failUnlessIn("type: immutable", report)
767 self.failUnlessIn("storage_index: %s" % si0_s, report)
768 self.failUnlessIn("share_number: 0", report)
769 self.failUnlessIn("This share smells funny.", report)
771 # test the RIBucketWriter version too
772 si1_s = base32.b2a("si1")
773 already,writers = self.allocate(ss, "si1", [1], 75)
774 self.failUnlessEqual(already, set())
775 self.failUnlessEqual(set(writers.keys()), set([1]))
776 writers[1].remote_write(0, "data")
777 writers[1].remote_close()
779 b = ss.remote_get_buckets("si1")
780 self.failUnlessEqual(set(b.keys()), set([1]))
781 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
783 reports = os.listdir(reportdir)
784 self.failUnlessEqual(len(reports), 2)
785 report_si1 = [r for r in reports if si1_s in r][0]
786 f = open(os.path.join(reportdir, report_si1), "r")
789 self.failUnlessIn("type: immutable", report)
790 self.failUnlessIn("storage_index: %s" % si1_s, report)
791 self.failUnlessIn("share_number: 1", report)
792 self.failUnlessIn("This share tastes like dust.", report)
796 class MutableServer(unittest.TestCase):
799 self.sparent = LoggingServiceParent()
800 self._lease_secret = itertools.count()
802 return self.sparent.stopService()
804 def workdir(self, name):
805 basedir = os.path.join("storage", "MutableServer", name)
808 def create(self, name):
809 workdir = self.workdir(name)
810 ss = StorageServer(workdir, "\x00" * 20)
811 ss.setServiceParent(self.sparent)
814 def test_create(self):
815 self.create("test_create")
817 def write_enabler(self, we_tag):
818 return hashutil.tagged_hash("we_blah", we_tag)
820 def renew_secret(self, tag):
821 return hashutil.tagged_hash("renew_blah", str(tag))
823 def cancel_secret(self, tag):
824 return hashutil.tagged_hash("cancel_blah", str(tag))
826 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
827 write_enabler = self.write_enabler(we_tag)
828 renew_secret = self.renew_secret(lease_tag)
829 cancel_secret = self.cancel_secret(lease_tag)
830 rstaraw = ss.remote_slot_testv_and_readv_and_writev
831 testandwritev = dict( [ (shnum, ([], [], None) )
832 for shnum in sharenums ] )
834 rc = rstaraw(storage_index,
835 (write_enabler, renew_secret, cancel_secret),
838 (did_write, readv_data) = rc
839 self.failUnless(did_write)
840 self.failUnless(isinstance(readv_data, dict))
841 self.failUnlessEqual(len(readv_data), 0)
843 def test_bad_magic(self):
844 ss = self.create("test_bad_magic")
845 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
846 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
851 read = ss.remote_slot_readv
852 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
853 read, "si1", [0], [(0,10)])
854 self.failUnlessIn(" had magic ", str(e))
855 self.failUnlessIn(" but we wanted ", str(e))
857 def test_container_size(self):
858 ss = self.create("test_container_size")
859 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
861 read = ss.remote_slot_readv
862 rstaraw = ss.remote_slot_testv_and_readv_and_writev
863 secrets = ( self.write_enabler("we1"),
864 self.renew_secret("we1"),
865 self.cancel_secret("we1") )
866 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
867 answer = rstaraw("si1", secrets,
868 {0: ([], [(0,data)], len(data)+12)},
870 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
872 # trying to make the container too large will raise an exception
873 TOOBIG = MutableShareFile.MAX_SIZE + 10
874 self.failUnlessRaises(DataTooLargeError,
875 rstaraw, "si1", secrets,
876 {0: ([], [(0,data)], TOOBIG)},
879 # it should be possible to make the container smaller, although at
880 # the moment this doesn't actually affect the share, unless the
881 # container size is dropped to zero, in which case the share is
883 answer = rstaraw("si1", secrets,
884 {0: ([], [(0,data)], len(data)+8)},
886 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
888 answer = rstaraw("si1", secrets,
889 {0: ([], [(0,data)], 0)},
891 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
893 read_answer = read("si1", [0], [(0,10)])
894 self.failUnlessEqual(read_answer, {})
896 def test_allocate(self):
897 ss = self.create("test_allocate")
898 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
901 read = ss.remote_slot_readv
902 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
904 self.failUnlessEqual(read("si1", [], [(0, 10)]),
905 {0: [""], 1: [""], 2: [""]})
906 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
910 secrets = ( self.write_enabler("we1"),
911 self.renew_secret("we1"),
912 self.cancel_secret("we1") )
913 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
914 write = ss.remote_slot_testv_and_readv_and_writev
915 answer = write("si1", secrets,
916 {0: ([], [(0,data)], None)},
918 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
920 self.failUnlessEqual(read("si1", [0], [(0,20)]),
921 {0: ["00000000001111111111"]})
922 self.failUnlessEqual(read("si1", [0], [(95,10)]),
924 #self.failUnlessEqual(s0.remote_get_length(), 100)
926 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
927 f = self.failUnlessRaises(BadWriteEnablerError,
928 write, "si1", bad_secrets,
930 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
932 # this testv should fail
933 answer = write("si1", secrets,
934 {0: ([(0, 12, "eq", "444444444444"),
935 (20, 5, "eq", "22222"),
942 self.failUnlessEqual(answer, (False,
943 {0: ["000000000011", "22222"],
947 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
950 answer = write("si1", secrets,
951 {0: ([(10, 5, "lt", "11111"),
958 self.failUnlessEqual(answer, (False,
963 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
966 def test_operators(self):
967 # test operators, the data we're comparing is '11111' in all cases.
968 # test both fail+pass, reset data after each one.
969 ss = self.create("test_operators")
971 secrets = ( self.write_enabler("we1"),
972 self.renew_secret("we1"),
973 self.cancel_secret("we1") )
974 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
975 write = ss.remote_slot_testv_and_readv_and_writev
976 read = ss.remote_slot_readv
979 write("si1", secrets,
980 {0: ([], [(0,data)], None)},
986 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
991 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
992 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
993 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
996 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
1001 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1002 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1005 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1010 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1011 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1015 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1020 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1021 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1024 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1029 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1030 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1033 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1038 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1039 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1043 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1048 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1049 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1052 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1057 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1058 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1062 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1067 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1068 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1071 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1076 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1077 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1081 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1086 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1087 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1090 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1095 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1096 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1099 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1104 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1105 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1109 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1114 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1115 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1118 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1123 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1124 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1127 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1132 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1133 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1136 # finally, test some operators against empty shares
1137 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1142 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1143 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1146 def test_readv(self):
1147 ss = self.create("test_readv")
1148 secrets = ( self.write_enabler("we1"),
1149 self.renew_secret("we1"),
1150 self.cancel_secret("we1") )
1151 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1152 write = ss.remote_slot_testv_and_readv_and_writev
1153 read = ss.remote_slot_readv
1154 data = [("%d" % i) * 100 for i in range(3)]
1155 rc = write("si1", secrets,
1156 {0: ([], [(0,data[0])], None),
1157 1: ([], [(0,data[1])], None),
1158 2: ([], [(0,data[2])], None),
1160 self.failUnlessEqual(rc, (True, {}))
1162 answer = read("si1", [], [(0, 10)])
1163 self.failUnlessEqual(answer, {0: ["0"*10],
1167 def compare_leases_without_timestamps(self, leases_a, leases_b):
1168 self.failUnlessEqual(len(leases_a), len(leases_b))
1169 for i in range(len(leases_a)):
1172 self.failUnlessEqual(a.owner_num, b.owner_num)
1173 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1174 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1175 self.failUnlessEqual(a.nodeid, b.nodeid)
1177 def compare_leases(self, leases_a, leases_b):
1178 self.failUnlessEqual(len(leases_a), len(leases_b))
1179 for i in range(len(leases_a)):
1182 self.failUnlessEqual(a.owner_num, b.owner_num)
1183 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1184 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1185 self.failUnlessEqual(a.nodeid, b.nodeid)
1186 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1188 def test_leases(self):
1189 ss = self.create("test_leases")
1191 return ( self.write_enabler("we1"),
1192 self.renew_secret("we1-%d" % n),
1193 self.cancel_secret("we1-%d" % n) )
1194 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1195 write = ss.remote_slot_testv_and_readv_and_writev
1196 read = ss.remote_slot_readv
1197 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1198 self.failUnlessEqual(rc, (True, {}))
1200 # create a random non-numeric file in the bucket directory, to
1201 # exercise the code that's supposed to ignore those.
1202 bucket_dir = os.path.join(self.workdir("test_leases"),
1203 "shares", storage_index_to_dir("si1"))
1204 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1205 f.write("you ought to be ignoring me\n")
1208 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1209 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1211 # add-lease on a missing storage index is silently ignored
1212 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1214 # re-allocate the slots and use the same secrets, that should update
1216 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1217 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1220 ss.remote_renew_lease("si1", secrets(0)[1])
1221 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1223 # now allocate them with a bunch of different secrets, to trigger the
1224 # extended lease code. Use add_lease for one of them.
1225 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1226 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1227 secrets2 = secrets(2)
1228 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1229 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1230 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1231 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1232 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1234 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1236 # cancel one of them
1237 ss.remote_cancel_lease("si1", secrets(5)[2])
1238 self.failUnlessEqual(len(list(s0.get_leases())), 5)
1240 all_leases = list(s0.get_leases())
1241 # and write enough data to expand the container, forcing the server
1242 # to move the leases
1243 write("si1", secrets(0),
1244 {0: ([], [(0,data)], 200), },
1247 # read back the leases, make sure they're still intact.
1248 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1250 ss.remote_renew_lease("si1", secrets(0)[1])
1251 ss.remote_renew_lease("si1", secrets(1)[1])
1252 ss.remote_renew_lease("si1", secrets(2)[1])
1253 ss.remote_renew_lease("si1", secrets(3)[1])
1254 ss.remote_renew_lease("si1", secrets(4)[1])
1255 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1256 # get a new copy of the leases, with the current timestamps. Reading
1257 # data and failing to renew/cancel leases should leave the timestamps
1259 all_leases = list(s0.get_leases())
1260 # renewing with a bogus token should prompt an error message
1262 # examine the exception thus raised, make sure the old nodeid is
1263 # present, to provide for share migration
1264 e = self.failUnlessRaises(IndexError,
1265 ss.remote_renew_lease, "si1",
1268 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1269 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1270 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1272 # same for cancelling
1273 self.failUnlessRaises(IndexError,
1274 ss.remote_cancel_lease, "si1",
1276 self.compare_leases(all_leases, list(s0.get_leases()))
1278 # reading shares should not modify the timestamp
1279 read("si1", [], [(0,200)])
1280 self.compare_leases(all_leases, list(s0.get_leases()))
1282 write("si1", secrets(0),
1283 {0: ([], [(200, "make me bigger")], None)}, [])
1284 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1286 write("si1", secrets(0),
1287 {0: ([], [(500, "make me really bigger")], None)}, [])
1288 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1290 # now cancel them all
1291 ss.remote_cancel_lease("si1", secrets(0)[2])
1292 ss.remote_cancel_lease("si1", secrets(1)[2])
1293 ss.remote_cancel_lease("si1", secrets(2)[2])
1294 ss.remote_cancel_lease("si1", secrets(3)[2])
1296 # the slot should still be there
1297 remaining_shares = read("si1", [], [(0,10)])
1298 self.failUnlessEqual(len(remaining_shares), 1)
1299 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1301 # cancelling a non-existent lease should raise an IndexError
1302 self.failUnlessRaises(IndexError,
1303 ss.remote_cancel_lease, "si1", "nonsecret")
1305 # and the slot should still be there
1306 remaining_shares = read("si1", [], [(0,10)])
1307 self.failUnlessEqual(len(remaining_shares), 1)
1308 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1310 ss.remote_cancel_lease("si1", secrets(4)[2])
1311 # now the slot should be gone
1312 no_shares = read("si1", [], [(0,10)])
1313 self.failUnlessEqual(no_shares, {})
1315 # cancelling a lease on a non-existent share should raise an IndexError
1316 self.failUnlessRaises(IndexError,
1317 ss.remote_cancel_lease, "si2", "nonsecret")
1319 def test_remove(self):
1320 ss = self.create("test_remove")
1321 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1323 readv = ss.remote_slot_readv
1324 writev = ss.remote_slot_testv_and_readv_and_writev
1325 secrets = ( self.write_enabler("we1"),
1326 self.renew_secret("we1"),
1327 self.cancel_secret("we1") )
1328 # delete sh0 by setting its size to zero
1329 answer = writev("si1", secrets,
1332 # the answer should mention all the shares that existed before the
1334 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1335 # but a new read should show only sh1 and sh2
1336 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1339 # delete sh1 by setting its size to zero
1340 answer = writev("si1", secrets,
1343 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1344 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1347 # delete sh2 by setting its size to zero
1348 answer = writev("si1", secrets,
1351 self.failUnlessEqual(answer, (True, {2:[]}) )
1352 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1354 # and the bucket directory should now be gone
1355 si = base32.b2a("si1")
1356 # note: this is a detail of the storage server implementation, and
1357 # may change in the future
1359 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1360 bucketdir = os.path.join(prefixdir, si)
1361 self.failUnless(os.path.exists(prefixdir), prefixdir)
1362 self.failIf(os.path.exists(bucketdir), bucketdir)
1365 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1367 self.sparent = LoggingServiceParent()
1368 self._lease_secret = itertools.count()
1369 self.ss = self.create("MDMFProxies storage test server")
1370 self.rref = RemoteBucket()
1371 self.rref.target = self.ss
1372 self.secrets = (self.write_enabler("we_secret"),
1373 self.renew_secret("renew_secret"),
1374 self.cancel_secret("cancel_secret"))
1375 self.segment = "aaaaaa"
1377 self.salt = "a" * 16
1378 self.block_hash = "a" * 32
1379 self.block_hash_tree = [self.block_hash for i in xrange(6)]
1380 self.share_hash = self.block_hash
1381 self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1382 self.signature = "foobarbaz"
1383 self.verification_key = "vvvvvv"
1384 self.encprivkey = "private"
1385 self.root_hash = self.block_hash
1386 self.salt_hash = self.root_hash
1387 self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1388 self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1389 self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1390 # blockhashes and salt hashes are serialized in the same way,
1391 # only we lop off the first element and store that in the
1393 self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1397 self.sparent.stopService()
1398 shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1401 def write_enabler(self, we_tag):
1402 return hashutil.tagged_hash("we_blah", we_tag)
1405 def renew_secret(self, tag):
1406 return hashutil.tagged_hash("renew_blah", str(tag))
1409 def cancel_secret(self, tag):
1410 return hashutil.tagged_hash("cancel_blah", str(tag))
1413 def workdir(self, name):
1414 basedir = os.path.join("storage", "MutableServer", name)
1418 def create(self, name):
1419 workdir = self.workdir(name)
1420 ss = StorageServer(workdir, "\x00" * 20)
1421 ss.setServiceParent(self.sparent)
1425 def build_test_mdmf_share(self, tail_segment=False, empty=False):
1426 # Start with the checkstring
1427 data = struct.pack(">BQ32s",
1431 self.checkstring = data
1432 # Next, the encoding parameters
1434 data += struct.pack(">BBQQ",
1440 data += struct.pack(">BBQQ",
1446 data += struct.pack(">BBQQ",
1451 # Now we'll build the offsets.
1453 if not tail_segment and not empty:
1455 sharedata += self.salt + self.block
1458 sharedata += self.salt + self.block
1459 sharedata += self.salt + "a"
1461 # The encrypted private key comes after the shares + salts
1462 offset_size = struct.calcsize(MDMFOFFSETS)
1463 encrypted_private_key_offset = len(data) + offset_size
1464 # The share has chain comes after the private key
1465 sharehashes_offset = encrypted_private_key_offset + \
1466 len(self.encprivkey)
1468 # The signature comes after the share hash chain.
1469 signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1471 verification_key_offset = signature_offset + len(self.signature)
1472 verification_key_end = verification_key_offset + \
1473 len(self.verification_key)
1475 share_data_offset = offset_size
1476 share_data_offset += PRIVATE_KEY_SIZE
1477 share_data_offset += SIGNATURE_SIZE
1478 share_data_offset += VERIFICATION_KEY_SIZE
1479 share_data_offset += SHARE_HASH_CHAIN_SIZE
1481 blockhashes_offset = share_data_offset + len(sharedata)
1482 eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1484 data += struct.pack(MDMFOFFSETS,
1485 encrypted_private_key_offset,
1488 verification_key_offset,
1489 verification_key_end,
1495 self.offsets['enc_privkey'] = encrypted_private_key_offset
1496 self.offsets['block_hash_tree'] = blockhashes_offset
1497 self.offsets['share_hash_chain'] = sharehashes_offset
1498 self.offsets['signature'] = signature_offset
1499 self.offsets['verification_key'] = verification_key_offset
1500 self.offsets['share_data'] = share_data_offset
1501 self.offsets['verification_key_end'] = verification_key_end
1502 self.offsets['EOF'] = eof_offset
1505 data += self.encprivkey
1507 data += self.share_hash_chain_s
1509 data += self.signature
1510 # and the verification key
1511 data += self.verification_key
1512 # Then we'll add in gibberish until we get to the right point.
1513 nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1516 # Then the share data
1519 data += self.block_hash_tree_s
1523 def write_test_share_to_server(self,
1528 I write some data for the read tests to read to self.ss
1530 If tail_segment=True, then I will write a share that has a
1531 smaller tail segment than other segments.
1533 write = self.ss.remote_slot_testv_and_readv_and_writev
1534 data = self.build_test_mdmf_share(tail_segment, empty)
1535 # Finally, we write the whole thing to the storage server in one
1537 testvs = [(0, 1, "eq", "")]
1539 tws[0] = (testvs, [(0, data)], None)
1541 results = write(storage_index, self.secrets, tws, readv)
1542 self.failUnless(results[0])
1545 def build_test_sdmf_share(self, empty=False):
1549 sharedata = self.segment * 6
1550 self.sharedata = sharedata
1551 blocksize = len(sharedata) / 3
1552 block = sharedata[:blocksize]
1553 self.blockdata = block
1554 prefix = struct.pack(">BQ32s16s BBQQ",
1564 post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1565 signature_offset = post_offset + len(self.verification_key)
1566 sharehashes_offset = signature_offset + len(self.signature)
1567 blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1568 sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1569 encprivkey_offset = sharedata_offset + len(block)
1570 eof_offset = encprivkey_offset + len(self.encprivkey)
1571 offsets = struct.pack(">LLLLQQ",
1578 final_share = "".join([prefix,
1580 self.verification_key,
1582 self.share_hash_chain_s,
1583 self.block_hash_tree_s,
1587 self.offsets['signature'] = signature_offset
1588 self.offsets['share_hash_chain'] = sharehashes_offset
1589 self.offsets['block_hash_tree'] = blockhashes_offset
1590 self.offsets['share_data'] = sharedata_offset
1591 self.offsets['enc_privkey'] = encprivkey_offset
1592 self.offsets['EOF'] = eof_offset
1596 def write_sdmf_share_to_server(self,
1599 # Some tests need SDMF shares to verify that we can still
1600 # read them. This method writes one, which resembles but is not
1602 write = self.ss.remote_slot_testv_and_readv_and_writev
1603 share = self.build_test_sdmf_share(empty)
1604 testvs = [(0, 1, "eq", "")]
1606 tws[0] = (testvs, [(0, share)], None)
1608 results = write(storage_index, self.secrets, tws, readv)
1609 self.failUnless(results[0])
1612 def test_read(self):
1613 self.write_test_share_to_server("si1")
1614 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1615 # Check that every method equals what we expect it to.
1616 d = defer.succeed(None)
1617 def _check_block_and_salt((block, salt)):
1618 self.failUnlessEqual(block, self.block)
1619 self.failUnlessEqual(salt, self.salt)
1622 d.addCallback(lambda ignored, i=i:
1623 mr.get_block_and_salt(i))
1624 d.addCallback(_check_block_and_salt)
1626 d.addCallback(lambda ignored:
1627 mr.get_encprivkey())
1628 d.addCallback(lambda encprivkey:
1629 self.failUnlessEqual(self.encprivkey, encprivkey))
1631 d.addCallback(lambda ignored:
1632 mr.get_blockhashes())
1633 d.addCallback(lambda blockhashes:
1634 self.failUnlessEqual(self.block_hash_tree, blockhashes))
1636 d.addCallback(lambda ignored:
1637 mr.get_sharehashes())
1638 d.addCallback(lambda sharehashes:
1639 self.failUnlessEqual(self.share_hash_chain, sharehashes))
1641 d.addCallback(lambda ignored:
1643 d.addCallback(lambda signature:
1644 self.failUnlessEqual(signature, self.signature))
1646 d.addCallback(lambda ignored:
1647 mr.get_verification_key())
1648 d.addCallback(lambda verification_key:
1649 self.failUnlessEqual(verification_key, self.verification_key))
1651 d.addCallback(lambda ignored:
1653 d.addCallback(lambda seqnum:
1654 self.failUnlessEqual(seqnum, 0))
1656 d.addCallback(lambda ignored:
1658 d.addCallback(lambda root_hash:
1659 self.failUnlessEqual(self.root_hash, root_hash))
1661 d.addCallback(lambda ignored:
1663 d.addCallback(lambda seqnum:
1664 self.failUnlessEqual(0, seqnum))
1666 d.addCallback(lambda ignored:
1667 mr.get_encoding_parameters())
1668 def _check_encoding_parameters((k, n, segsize, datalen)):
1669 self.failUnlessEqual(k, 3)
1670 self.failUnlessEqual(n, 10)
1671 self.failUnlessEqual(segsize, 6)
1672 self.failUnlessEqual(datalen, 36)
1673 d.addCallback(_check_encoding_parameters)
1675 d.addCallback(lambda ignored:
1676 mr.get_checkstring())
1677 d.addCallback(lambda checkstring:
1678 self.failUnlessEqual(checkstring, checkstring))
1682 def test_read_with_different_tail_segment_size(self):
1683 self.write_test_share_to_server("si1", tail_segment=True)
1684 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1685 d = mr.get_block_and_salt(5)
1686 def _check_tail_segment(results):
1687 block, salt = results
1688 self.failUnlessEqual(len(block), 1)
1689 self.failUnlessEqual(block, "a")
1690 d.addCallback(_check_tail_segment)
1694 def test_get_block_with_invalid_segnum(self):
1695 self.write_test_share_to_server("si1")
1696 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1697 d = defer.succeed(None)
1698 d.addCallback(lambda ignored:
1699 self.shouldFail(LayoutInvalid, "test invalid segnum",
1701 mr.get_block_and_salt, 7))
1705 def test_get_encoding_parameters_first(self):
1706 self.write_test_share_to_server("si1")
1707 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1708 d = mr.get_encoding_parameters()
1709 def _check_encoding_parameters((k, n, segment_size, datalen)):
1710 self.failUnlessEqual(k, 3)
1711 self.failUnlessEqual(n, 10)
1712 self.failUnlessEqual(segment_size, 6)
1713 self.failUnlessEqual(datalen, 36)
1714 d.addCallback(_check_encoding_parameters)
1718 def test_get_seqnum_first(self):
1719 self.write_test_share_to_server("si1")
1720 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1722 d.addCallback(lambda seqnum:
1723 self.failUnlessEqual(seqnum, 0))
1727 def test_get_root_hash_first(self):
1728 self.write_test_share_to_server("si1")
1729 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1730 d = mr.get_root_hash()
1731 d.addCallback(lambda root_hash:
1732 self.failUnlessEqual(root_hash, self.root_hash))
1736 def test_get_checkstring_first(self):
1737 self.write_test_share_to_server("si1")
1738 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1739 d = mr.get_checkstring()
1740 d.addCallback(lambda checkstring:
1741 self.failUnlessEqual(checkstring, self.checkstring))
1745 def test_write_read_vectors(self):
1746 # When writing for us, the storage server will return to us a
1747 # read vector, along with its result. If a write fails because
1748 # the test vectors failed, this read vector can help us to
1749 # diagnose the problem. This test ensures that the read vector
1750 # is working appropriately.
1751 mw = self._make_new_mw("si1", 0)
1754 mw.put_block(self.block, i, self.salt)
1755 mw.put_encprivkey(self.encprivkey)
1756 mw.put_blockhashes(self.block_hash_tree)
1757 mw.put_sharehashes(self.share_hash_chain)
1758 mw.put_root_hash(self.root_hash)
1759 mw.put_signature(self.signature)
1760 mw.put_verification_key(self.verification_key)
1761 d = mw.finish_publishing()
1763 self.failUnless(len(results), 2)
1764 result, readv = results
1765 self.failUnless(result)
1767 self.old_checkstring = mw.get_checkstring()
1768 mw.set_checkstring("")
1769 d.addCallback(_then)
1770 d.addCallback(lambda ignored:
1771 mw.finish_publishing())
1772 def _then_again(results):
1773 self.failUnlessEqual(len(results), 2)
1774 result, readvs = results
1776 self.failUnlessIn(0, readvs)
1777 readv = readvs[0][0]
1778 self.failUnlessEqual(readv, self.old_checkstring)
1779 d.addCallback(_then_again)
1780 # The checkstring remains the same for the rest of the process.
1784 def test_private_key_after_share_hash_chain(self):
1785 mw = self._make_new_mw("si1", 0)
1786 d = defer.succeed(None)
1788 d.addCallback(lambda ignored, i=i:
1789 mw.put_block(self.block, i, self.salt))
1790 d.addCallback(lambda ignored:
1791 mw.put_encprivkey(self.encprivkey))
1792 d.addCallback(lambda ignored:
1793 mw.put_sharehashes(self.share_hash_chain))
1795 # Now try to put the private key again.
1796 d.addCallback(lambda ignored:
1797 self.shouldFail(LayoutInvalid, "test repeat private key",
1799 mw.put_encprivkey, self.encprivkey))
1803 def test_signature_after_verification_key(self):
1804 mw = self._make_new_mw("si1", 0)
1805 d = defer.succeed(None)
1806 # Put everything up to and including the verification key.
1808 d.addCallback(lambda ignored, i=i:
1809 mw.put_block(self.block, i, self.salt))
1810 d.addCallback(lambda ignored:
1811 mw.put_encprivkey(self.encprivkey))
1812 d.addCallback(lambda ignored:
1813 mw.put_blockhashes(self.block_hash_tree))
1814 d.addCallback(lambda ignored:
1815 mw.put_sharehashes(self.share_hash_chain))
1816 d.addCallback(lambda ignored:
1817 mw.put_root_hash(self.root_hash))
1818 d.addCallback(lambda ignored:
1819 mw.put_signature(self.signature))
1820 d.addCallback(lambda ignored:
1821 mw.put_verification_key(self.verification_key))
1822 # Now try to put the signature again. This should fail
1823 d.addCallback(lambda ignored:
1824 self.shouldFail(LayoutInvalid, "signature after verification",
1826 mw.put_signature, self.signature))
1830 def test_uncoordinated_write(self):
1831 # Make two mutable writers, both pointing to the same storage
1832 # server, both at the same storage index, and try writing to the
1834 mw1 = self._make_new_mw("si1", 0)
1835 mw2 = self._make_new_mw("si1", 0)
1837 def _check_success(results):
1838 result, readvs = results
1839 self.failUnless(result)
1841 def _check_failure(results):
1842 result, readvs = results
1845 def _write_share(mw):
1847 mw.put_block(self.block, i, self.salt)
1848 mw.put_encprivkey(self.encprivkey)
1849 mw.put_blockhashes(self.block_hash_tree)
1850 mw.put_sharehashes(self.share_hash_chain)
1851 mw.put_root_hash(self.root_hash)
1852 mw.put_signature(self.signature)
1853 mw.put_verification_key(self.verification_key)
1854 return mw.finish_publishing()
1855 d = _write_share(mw1)
1856 d.addCallback(_check_success)
1857 d.addCallback(lambda ignored:
1859 d.addCallback(_check_failure)
1863 def test_invalid_salt_size(self):
1864 # Salts need to be 16 bytes in size. Writes that attempt to
1865 # write more or less than this should be rejected.
1866 mw = self._make_new_mw("si1", 0)
1867 invalid_salt = "a" * 17 # 17 bytes
1868 another_invalid_salt = "b" * 15 # 15 bytes
1869 d = defer.succeed(None)
1870 d.addCallback(lambda ignored:
1871 self.shouldFail(LayoutInvalid, "salt too big",
1873 mw.put_block, self.block, 0, invalid_salt))
1874 d.addCallback(lambda ignored:
1875 self.shouldFail(LayoutInvalid, "salt too small",
1877 mw.put_block, self.block, 0,
1878 another_invalid_salt))
1882 def test_write_test_vectors(self):
1883 # If we give the write proxy a bogus test vector at
1884 # any point during the process, it should fail to write when we
1886 def _check_failure(results):
1887 self.failUnlessEqual(len(results), 2)
1891 def _check_success(results):
1892 self.failUnlessEqual(len(results), 2)
1894 self.failUnless(results)
1896 mw = self._make_new_mw("si1", 0)
1897 mw.set_checkstring("this is a lie")
1899 mw.put_block(self.block, i, self.salt)
1900 mw.put_encprivkey(self.encprivkey)
1901 mw.put_blockhashes(self.block_hash_tree)
1902 mw.put_sharehashes(self.share_hash_chain)
1903 mw.put_root_hash(self.root_hash)
1904 mw.put_signature(self.signature)
1905 mw.put_verification_key(self.verification_key)
1906 d = mw.finish_publishing()
1907 d.addCallback(_check_failure)
1908 d.addCallback(lambda ignored:
1909 mw.set_checkstring(""))
1910 d.addCallback(lambda ignored:
1911 mw.finish_publishing())
1912 d.addCallback(_check_success)
1916 def serialize_blockhashes(self, blockhashes):
1917 return "".join(blockhashes)
1920 def serialize_sharehashes(self, sharehashes):
1921 ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1922 for i in sorted(sharehashes.keys())])
1926 def test_write(self):
1927 # This translates to a file with 6 6-byte segments, and with 2-byte
1929 mw = self._make_new_mw("si1", 0)
1930 # Test writing some blocks.
1931 read = self.ss.remote_slot_readv
1932 expected_private_key_offset = struct.calcsize(MDMFHEADER)
1933 expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1934 PRIVATE_KEY_SIZE + \
1936 VERIFICATION_KEY_SIZE + \
1937 SHARE_HASH_CHAIN_SIZE
1938 written_block_size = 2 + len(self.salt)
1939 written_block = self.block + self.salt
1941 mw.put_block(self.block, i, self.salt)
1943 mw.put_encprivkey(self.encprivkey)
1944 mw.put_blockhashes(self.block_hash_tree)
1945 mw.put_sharehashes(self.share_hash_chain)
1946 mw.put_root_hash(self.root_hash)
1947 mw.put_signature(self.signature)
1948 mw.put_verification_key(self.verification_key)
1949 d = mw.finish_publishing()
1950 def _check_publish(results):
1951 self.failUnlessEqual(len(results), 2)
1952 result, ign = results
1953 self.failUnless(result, "publish failed")
1955 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1956 {0: [written_block]})
1958 self.failUnlessEqual(len(self.encprivkey), 7)
1959 self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1960 {0: [self.encprivkey]})
1962 expected_block_hash_offset = expected_sharedata_offset + \
1963 (6 * written_block_size)
1964 self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1965 self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1966 {0: [self.block_hash_tree_s]})
1968 expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1969 self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1970 {0: [self.share_hash_chain_s]})
1972 self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1973 {0: [self.root_hash]})
1974 expected_signature_offset = expected_share_hash_offset + \
1975 len(self.share_hash_chain_s)
1976 self.failUnlessEqual(len(self.signature), 9)
1977 self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1978 {0: [self.signature]})
1980 expected_verification_key_offset = expected_signature_offset + len(self.signature)
1981 self.failUnlessEqual(len(self.verification_key), 6)
1982 self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1983 {0: [self.verification_key]})
1985 signable = mw.get_signable()
1986 verno, seq, roothash, k, n, segsize, datalen = \
1987 struct.unpack(">BQ32sBBQQ",
1989 self.failUnlessEqual(verno, 1)
1990 self.failUnlessEqual(seq, 0)
1991 self.failUnlessEqual(roothash, self.root_hash)
1992 self.failUnlessEqual(k, 3)
1993 self.failUnlessEqual(n, 10)
1994 self.failUnlessEqual(segsize, 6)
1995 self.failUnlessEqual(datalen, 36)
1996 expected_eof_offset = expected_block_hash_offset + \
1997 len(self.block_hash_tree_s)
1999 # Check the version number to make sure that it is correct.
2000 expected_version_number = struct.pack(">B", 1)
2001 self.failUnlessEqual(read("si1", [0], [(0, 1)]),
2002 {0: [expected_version_number]})
2003 # Check the sequence number to make sure that it is correct
2004 expected_sequence_number = struct.pack(">Q", 0)
2005 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2006 {0: [expected_sequence_number]})
2007 # Check that the encoding parameters (k, N, segement size, data
2008 # length) are what they should be. These are 3, 10, 6, 36
2009 expected_k = struct.pack(">B", 3)
2010 self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2012 expected_n = struct.pack(">B", 10)
2013 self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2015 expected_segment_size = struct.pack(">Q", 6)
2016 self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2017 {0: [expected_segment_size]})
2018 expected_data_length = struct.pack(">Q", 36)
2019 self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2020 {0: [expected_data_length]})
2021 expected_offset = struct.pack(">Q", expected_private_key_offset)
2022 self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2023 {0: [expected_offset]})
2024 expected_offset = struct.pack(">Q", expected_share_hash_offset)
2025 self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2026 {0: [expected_offset]})
2027 expected_offset = struct.pack(">Q", expected_signature_offset)
2028 self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2029 {0: [expected_offset]})
2030 expected_offset = struct.pack(">Q", expected_verification_key_offset)
2031 self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2032 {0: [expected_offset]})
2033 expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2034 self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2035 {0: [expected_offset]})
2036 expected_offset = struct.pack(">Q", expected_sharedata_offset)
2037 self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2038 {0: [expected_offset]})
2039 expected_offset = struct.pack(">Q", expected_block_hash_offset)
2040 self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2041 {0: [expected_offset]})
2042 expected_offset = struct.pack(">Q", expected_eof_offset)
2043 self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2044 {0: [expected_offset]})
2045 d.addCallback(_check_publish)
2048 def _make_new_mw(self, si, share, datalength=36):
2049 # This is a file of size 36 bytes. Since it has a segment
2050 # size of 6, we know that it has 6 byte segments, which will
2051 # be split into blocks of 2 bytes because our FEC k
2053 mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2058 def test_write_rejected_with_too_many_blocks(self):
2059 mw = self._make_new_mw("si0", 0)
2061 # Try writing too many blocks. We should not be able to write
2063 # blocks into each share.
2064 d = defer.succeed(None)
2066 d.addCallback(lambda ignored, i=i:
2067 mw.put_block(self.block, i, self.salt))
2068 d.addCallback(lambda ignored:
2069 self.shouldFail(LayoutInvalid, "too many blocks",
2071 mw.put_block, self.block, 7, self.salt))
2075 def test_write_rejected_with_invalid_salt(self):
2076 # Try writing an invalid salt. Salts are 16 bytes -- any more or
2077 # less should cause an error.
2078 mw = self._make_new_mw("si1", 0)
2079 bad_salt = "a" * 17 # 17 bytes
2080 d = defer.succeed(None)
2081 d.addCallback(lambda ignored:
2082 self.shouldFail(LayoutInvalid, "test_invalid_salt",
2083 None, mw.put_block, self.block, 7, bad_salt))
2087 def test_write_rejected_with_invalid_root_hash(self):
2088 # Try writing an invalid root hash. This should be SHA256d, and
2089 # 32 bytes long as a result.
2090 mw = self._make_new_mw("si2", 0)
2091 # 17 bytes != 32 bytes
2092 invalid_root_hash = "a" * 17
2093 d = defer.succeed(None)
2094 # Before this test can work, we need to put some blocks + salts,
2095 # a block hash tree, and a share hash tree. Otherwise, we'll see
2096 # failures that match what we are looking for, but are caused by
2097 # the constraints imposed on operation ordering.
2099 d.addCallback(lambda ignored, i=i:
2100 mw.put_block(self.block, i, self.salt))
2101 d.addCallback(lambda ignored:
2102 mw.put_encprivkey(self.encprivkey))
2103 d.addCallback(lambda ignored:
2104 mw.put_blockhashes(self.block_hash_tree))
2105 d.addCallback(lambda ignored:
2106 mw.put_sharehashes(self.share_hash_chain))
2107 d.addCallback(lambda ignored:
2108 self.shouldFail(LayoutInvalid, "invalid root hash",
2109 None, mw.put_root_hash, invalid_root_hash))
2113 def test_write_rejected_with_invalid_blocksize(self):
2114 # The blocksize implied by the writer that we get from
2115 # _make_new_mw is 2bytes -- any more or any less than this
2116 # should be cause for failure, unless it is the tail segment, in
2117 # which case it may not be failure.
2119 mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2121 # 1 bytes != 2 bytes
2122 d = defer.succeed(None)
2123 d.addCallback(lambda ignored, invalid_block=invalid_block:
2124 self.shouldFail(LayoutInvalid, "test blocksize too small",
2125 None, mw.put_block, invalid_block, 0,
2127 invalid_block = invalid_block * 3
2128 # 3 bytes != 2 bytes
2129 d.addCallback(lambda ignored:
2130 self.shouldFail(LayoutInvalid, "test blocksize too large",
2132 mw.put_block, invalid_block, 0, self.salt))
2134 d.addCallback(lambda ignored, i=i:
2135 mw.put_block(self.block, i, self.salt))
2136 # Try to put an invalid tail segment
2137 d.addCallback(lambda ignored:
2138 self.shouldFail(LayoutInvalid, "test invalid tail segment",
2140 mw.put_block, self.block, 5, self.salt))
2142 d.addCallback(lambda ignored:
2143 mw.put_block(valid_block, 5, self.salt))
2147 def test_write_enforces_order_constraints(self):
2148 # We require that the MDMFSlotWriteProxy be interacted with in a
2152 # 1: write blocks and salts
2153 # 2: Write the encrypted private key
2154 # 3: Write the block hashes
2155 # 4: Write the share hashes
2156 # 5: Write the root hash and salt hash
2157 # 6: Write the signature and verification key
2158 # 7: Write the file.
2160 # Some of these can be performed out-of-order, and some can't.
2161 # The dependencies that I want to test here are:
2162 # - Private key before block hashes
2163 # - share hashes and block hashes before root hash
2164 # - root hash before signature
2165 # - signature before verification key
2166 mw0 = self._make_new_mw("si0", 0)
2168 d = defer.succeed(None)
2170 d.addCallback(lambda ignored, i=i:
2171 mw0.put_block(self.block, i, self.salt))
2173 # Try to write the share hash chain without writing the
2174 # encrypted private key
2175 d.addCallback(lambda ignored:
2176 self.shouldFail(LayoutInvalid, "share hash chain before "
2179 mw0.put_sharehashes, self.share_hash_chain))
2180 # Write the private key.
2181 d.addCallback(lambda ignored:
2182 mw0.put_encprivkey(self.encprivkey))
2184 # Now write the block hashes and try again
2185 d.addCallback(lambda ignored:
2186 mw0.put_blockhashes(self.block_hash_tree))
2188 # We haven't yet put the root hash on the share, so we shouldn't
2189 # be able to sign it.
2190 d.addCallback(lambda ignored:
2191 self.shouldFail(LayoutInvalid, "signature before root hash",
2192 None, mw0.put_signature, self.signature))
2194 d.addCallback(lambda ignored:
2195 self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2197 # ..and, since that fails, we also shouldn't be able to put the
2199 d.addCallback(lambda ignored:
2200 self.shouldFail(LayoutInvalid, "key before signature",
2201 None, mw0.put_verification_key,
2202 self.verification_key))
2204 # Now write the share hashes.
2205 d.addCallback(lambda ignored:
2206 mw0.put_sharehashes(self.share_hash_chain))
2207 # We should be able to write the root hash now too
2208 d.addCallback(lambda ignored:
2209 mw0.put_root_hash(self.root_hash))
2211 # We should still be unable to put the verification key
2212 d.addCallback(lambda ignored:
2213 self.shouldFail(LayoutInvalid, "key before signature",
2214 None, mw0.put_verification_key,
2215 self.verification_key))
2217 d.addCallback(lambda ignored:
2218 mw0.put_signature(self.signature))
2220 # We shouldn't be able to write the offsets to the remote server
2221 # until the offset table is finished; IOW, until we have written
2222 # the verification key.
2223 d.addCallback(lambda ignored:
2224 self.shouldFail(LayoutInvalid, "offsets before verification key",
2226 mw0.finish_publishing))
2228 d.addCallback(lambda ignored:
2229 mw0.put_verification_key(self.verification_key))
2233 def test_end_to_end(self):
2234 mw = self._make_new_mw("si1", 0)
2235 # Write a share using the mutable writer, and make sure that the
2236 # reader knows how to read everything back to us.
2237 d = defer.succeed(None)
2239 d.addCallback(lambda ignored, i=i:
2240 mw.put_block(self.block, i, self.salt))
2241 d.addCallback(lambda ignored:
2242 mw.put_encprivkey(self.encprivkey))
2243 d.addCallback(lambda ignored:
2244 mw.put_blockhashes(self.block_hash_tree))
2245 d.addCallback(lambda ignored:
2246 mw.put_sharehashes(self.share_hash_chain))
2247 d.addCallback(lambda ignored:
2248 mw.put_root_hash(self.root_hash))
2249 d.addCallback(lambda ignored:
2250 mw.put_signature(self.signature))
2251 d.addCallback(lambda ignored:
2252 mw.put_verification_key(self.verification_key))
2253 d.addCallback(lambda ignored:
2254 mw.finish_publishing())
2256 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2257 def _check_block_and_salt((block, salt)):
2258 self.failUnlessEqual(block, self.block)
2259 self.failUnlessEqual(salt, self.salt)
2262 d.addCallback(lambda ignored, i=i:
2263 mr.get_block_and_salt(i))
2264 d.addCallback(_check_block_and_salt)
2266 d.addCallback(lambda ignored:
2267 mr.get_encprivkey())
2268 d.addCallback(lambda encprivkey:
2269 self.failUnlessEqual(self.encprivkey, encprivkey))
2271 d.addCallback(lambda ignored:
2272 mr.get_blockhashes())
2273 d.addCallback(lambda blockhashes:
2274 self.failUnlessEqual(self.block_hash_tree, blockhashes))
2276 d.addCallback(lambda ignored:
2277 mr.get_sharehashes())
2278 d.addCallback(lambda sharehashes:
2279 self.failUnlessEqual(self.share_hash_chain, sharehashes))
2281 d.addCallback(lambda ignored:
2283 d.addCallback(lambda signature:
2284 self.failUnlessEqual(signature, self.signature))
2286 d.addCallback(lambda ignored:
2287 mr.get_verification_key())
2288 d.addCallback(lambda verification_key:
2289 self.failUnlessEqual(verification_key, self.verification_key))
2291 d.addCallback(lambda ignored:
2293 d.addCallback(lambda seqnum:
2294 self.failUnlessEqual(seqnum, 0))
2296 d.addCallback(lambda ignored:
2298 d.addCallback(lambda root_hash:
2299 self.failUnlessEqual(self.root_hash, root_hash))
2301 d.addCallback(lambda ignored:
2302 mr.get_encoding_parameters())
2303 def _check_encoding_parameters((k, n, segsize, datalen)):
2304 self.failUnlessEqual(k, 3)
2305 self.failUnlessEqual(n, 10)
2306 self.failUnlessEqual(segsize, 6)
2307 self.failUnlessEqual(datalen, 36)
2308 d.addCallback(_check_encoding_parameters)
2310 d.addCallback(lambda ignored:
2311 mr.get_checkstring())
2312 d.addCallback(lambda checkstring:
2313 self.failUnlessEqual(checkstring, mw.get_checkstring()))
2317 def test_is_sdmf(self):
2318 # The MDMFSlotReadProxy should also know how to read SDMF files,
2319 # since it will encounter them on the grid. Callers use the
2320 # is_sdmf method to test this.
2321 self.write_sdmf_share_to_server("si1")
2322 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2324 d.addCallback(lambda issdmf:
2325 self.failUnless(issdmf))
2329 def test_reads_sdmf(self):
2330 # The slot read proxy should, naturally, know how to tell us
2331 # about data in the SDMF format
2332 self.write_sdmf_share_to_server("si1")
2333 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2334 d = defer.succeed(None)
2335 d.addCallback(lambda ignored:
2337 d.addCallback(lambda issdmf:
2338 self.failUnless(issdmf))
2340 # What do we need to read?
2343 d.addCallback(lambda ignored:
2344 mr.get_block_and_salt(0))
2345 def _check_block_and_salt(results):
2346 block, salt = results
2347 # Our original file is 36 bytes long. Then each share is 12
2348 # bytes in size. The share is composed entirely of the
2349 # letter a. self.block contains 2 as, so 6 * self.block is
2350 # what we are looking for.
2351 self.failUnlessEqual(block, self.block * 6)
2352 self.failUnlessEqual(salt, self.salt)
2353 d.addCallback(_check_block_and_salt)
2356 d.addCallback(lambda ignored:
2357 mr.get_blockhashes())
2358 d.addCallback(lambda blockhashes:
2359 self.failUnlessEqual(self.block_hash_tree,
2363 d.addCallback(lambda ignored:
2364 mr.get_sharehashes())
2365 d.addCallback(lambda sharehashes:
2366 self.failUnlessEqual(self.share_hash_chain,
2369 d.addCallback(lambda ignored:
2370 mr.get_encprivkey())
2371 d.addCallback(lambda encprivkey:
2372 self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2373 d.addCallback(lambda ignored:
2374 mr.get_verification_key())
2375 d.addCallback(lambda verification_key:
2376 self.failUnlessEqual(verification_key,
2377 self.verification_key,
2380 d.addCallback(lambda ignored:
2382 d.addCallback(lambda signature:
2383 self.failUnlessEqual(signature, self.signature, signature))
2385 # - The sequence number
2386 d.addCallback(lambda ignored:
2388 d.addCallback(lambda seqnum:
2389 self.failUnlessEqual(seqnum, 0, seqnum))
2392 d.addCallback(lambda ignored:
2394 d.addCallback(lambda root_hash:
2395 self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2399 def test_only_reads_one_segment_sdmf(self):
2400 # SDMF shares have only one segment, so it doesn't make sense to
2401 # read more segments than that. The reader should know this and
2402 # complain if we try to do that.
2403 self.write_sdmf_share_to_server("si1")
2404 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2405 d = defer.succeed(None)
2406 d.addCallback(lambda ignored:
2408 d.addCallback(lambda issdmf:
2409 self.failUnless(issdmf))
2410 d.addCallback(lambda ignored:
2411 self.shouldFail(LayoutInvalid, "test bad segment",
2413 mr.get_block_and_salt, 1))
2417 def test_read_with_prefetched_mdmf_data(self):
2418 # The MDMFSlotReadProxy will prefill certain fields if you pass
2419 # it data that you have already fetched. This is useful for
2420 # cases like the Servermap, which prefetches ~2kb of data while
2421 # finding out which shares are on the remote peer so that it
2422 # doesn't waste round trips.
2423 mdmf_data = self.build_test_mdmf_share()
2424 self.write_test_share_to_server("si1")
2425 def _make_mr(ignored, length):
2426 mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2429 d = defer.succeed(None)
2430 # This should be enough to fill in both the encoding parameters
2431 # and the table of offsets, which will complete the version
2432 # information tuple.
2433 d.addCallback(_make_mr, 123)
2434 d.addCallback(lambda mr:
2436 def _check_verinfo(verinfo):
2437 self.failUnless(verinfo)
2438 self.failUnlessEqual(len(verinfo), 9)
2448 self.failUnlessEqual(seqnum, 0)
2449 self.failUnlessEqual(root_hash, self.root_hash)
2450 self.failUnlessEqual(segsize, 6)
2451 self.failUnlessEqual(datalen, 36)
2452 self.failUnlessEqual(k, 3)
2453 self.failUnlessEqual(n, 10)
2454 expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2462 self.failUnlessEqual(expected_prefix, prefix)
2463 self.failUnlessEqual(self.rref.read_count, 0)
2464 d.addCallback(_check_verinfo)
2465 # This is not enough data to read a block and a share, so the
2466 # wrapper should attempt to read this from the remote server.
2467 d.addCallback(_make_mr, 123)
2468 d.addCallback(lambda mr:
2469 mr.get_block_and_salt(0))
2470 def _check_block_and_salt((block, salt)):
2471 self.failUnlessEqual(block, self.block)
2472 self.failUnlessEqual(salt, self.salt)
2473 self.failUnlessEqual(self.rref.read_count, 1)
2474 # This should be enough data to read one block.
2475 d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2476 d.addCallback(lambda mr:
2477 mr.get_block_and_salt(0))
2478 d.addCallback(_check_block_and_salt)
2482 def test_read_with_prefetched_sdmf_data(self):
2483 sdmf_data = self.build_test_sdmf_share()
2484 self.write_sdmf_share_to_server("si1")
2485 def _make_mr(ignored, length):
2486 mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2489 d = defer.succeed(None)
2490 # This should be enough to get us the encoding parameters,
2491 # offset table, and everything else we need to build a verinfo
2493 d.addCallback(_make_mr, 123)
2494 d.addCallback(lambda mr:
2496 def _check_verinfo(verinfo):
2497 self.failUnless(verinfo)
2498 self.failUnlessEqual(len(verinfo), 9)
2508 self.failUnlessEqual(seqnum, 0)
2509 self.failUnlessEqual(root_hash, self.root_hash)
2510 self.failUnlessEqual(salt, self.salt)
2511 self.failUnlessEqual(segsize, 36)
2512 self.failUnlessEqual(datalen, 36)
2513 self.failUnlessEqual(k, 3)
2514 self.failUnlessEqual(n, 10)
2515 expected_prefix = struct.pack(SIGNED_PREFIX,
2524 self.failUnlessEqual(expected_prefix, prefix)
2525 self.failUnlessEqual(self.rref.read_count, 0)
2526 d.addCallback(_check_verinfo)
2527 # This shouldn't be enough to read any share data.
2528 d.addCallback(_make_mr, 123)
2529 d.addCallback(lambda mr:
2530 mr.get_block_and_salt(0))
2531 def _check_block_and_salt((block, salt)):
2532 self.failUnlessEqual(block, self.block * 6)
2533 self.failUnlessEqual(salt, self.salt)
2534 # TODO: Fix the read routine so that it reads only the data
2535 # that it has cached if it can't read all of it.
2536 self.failUnlessEqual(self.rref.read_count, 2)
2538 # This should be enough to read share data.
2539 d.addCallback(_make_mr, self.offsets['share_data'])
2540 d.addCallback(lambda mr:
2541 mr.get_block_and_salt(0))
2542 d.addCallback(_check_block_and_salt)
2546 def test_read_with_empty_mdmf_file(self):
2547 # Some tests upload a file with no contents to test things
2548 # unrelated to the actual handling of the content of the file.
2549 # The reader should behave intelligently in these cases.
2550 self.write_test_share_to_server("si1", empty=True)
2551 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2552 # We should be able to get the encoding parameters, and they
2553 # should be correct.
2554 d = defer.succeed(None)
2555 d.addCallback(lambda ignored:
2556 mr.get_encoding_parameters())
2557 def _check_encoding_parameters(params):
2558 self.failUnlessEqual(len(params), 4)
2559 k, n, segsize, datalen = params
2560 self.failUnlessEqual(k, 3)
2561 self.failUnlessEqual(n, 10)
2562 self.failUnlessEqual(segsize, 0)
2563 self.failUnlessEqual(datalen, 0)
2564 d.addCallback(_check_encoding_parameters)
2566 # We should not be able to fetch a block, since there are no
2568 d.addCallback(lambda ignored:
2569 self.shouldFail(LayoutInvalid, "get block on empty file",
2571 mr.get_block_and_salt, 0))
2575 def test_read_with_empty_sdmf_file(self):
2576 self.write_sdmf_share_to_server("si1", empty=True)
2577 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2578 # We should be able to get the encoding parameters, and they
2580 d = defer.succeed(None)
2581 d.addCallback(lambda ignored:
2582 mr.get_encoding_parameters())
2583 def _check_encoding_parameters(params):
2584 self.failUnlessEqual(len(params), 4)
2585 k, n, segsize, datalen = params
2586 self.failUnlessEqual(k, 3)
2587 self.failUnlessEqual(n, 10)
2588 self.failUnlessEqual(segsize, 0)
2589 self.failUnlessEqual(datalen, 0)
2590 d.addCallback(_check_encoding_parameters)
2592 # It does not make sense to get a block in this format, so we
2593 # should not be able to.
2594 d.addCallback(lambda ignored:
2595 self.shouldFail(LayoutInvalid, "get block on an empty file",
2597 mr.get_block_and_salt, 0))
2601 def test_verinfo_with_sdmf_file(self):
2602 self.write_sdmf_share_to_server("si1")
2603 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2604 # We should be able to get the version information.
2605 d = defer.succeed(None)
2606 d.addCallback(lambda ignored:
2608 def _check_verinfo(verinfo):
2609 self.failUnless(verinfo)
2610 self.failUnlessEqual(len(verinfo), 9)
2620 self.failUnlessEqual(seqnum, 0)
2621 self.failUnlessEqual(root_hash, self.root_hash)
2622 self.failUnlessEqual(salt, self.salt)
2623 self.failUnlessEqual(segsize, 36)
2624 self.failUnlessEqual(datalen, 36)
2625 self.failUnlessEqual(k, 3)
2626 self.failUnlessEqual(n, 10)
2627 expected_prefix = struct.pack(">BQ32s16s BBQQ",
2636 self.failUnlessEqual(prefix, expected_prefix)
2637 self.failUnlessEqual(offsets, self.offsets)
2638 d.addCallback(_check_verinfo)
2642 def test_verinfo_with_mdmf_file(self):
2643 self.write_test_share_to_server("si1")
2644 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2645 d = defer.succeed(None)
2646 d.addCallback(lambda ignored:
2648 def _check_verinfo(verinfo):
2649 self.failUnless(verinfo)
2650 self.failUnlessEqual(len(verinfo), 9)
2660 self.failUnlessEqual(seqnum, 0)
2661 self.failUnlessEqual(root_hash, self.root_hash)
2663 self.failUnlessEqual(segsize, 6)
2664 self.failUnlessEqual(datalen, 36)
2665 self.failUnlessEqual(k, 3)
2666 self.failUnlessEqual(n, 10)
2667 expected_prefix = struct.pack(">BQ32s BBQQ",
2675 self.failUnlessEqual(prefix, expected_prefix)
2676 self.failUnlessEqual(offsets, self.offsets)
2677 d.addCallback(_check_verinfo)
2681 def test_sdmf_writer(self):
2682 # Go through the motions of writing an SDMF share to the storage
2683 # server. Then read the storage server to see that the share got
2684 # written in the way that we think it should have.
2686 # We do this first so that the necessary instance variables get
2687 # set the way we want them for the tests below.
2688 data = self.build_test_sdmf_share()
2689 sdmfr = SDMFSlotWriteProxy(0,
2694 # Put the block and salt.
2695 sdmfr.put_block(self.blockdata, 0, self.salt)
2697 # Put the encprivkey
2698 sdmfr.put_encprivkey(self.encprivkey)
2700 # Put the block and share hash chains
2701 sdmfr.put_blockhashes(self.block_hash_tree)
2702 sdmfr.put_sharehashes(self.share_hash_chain)
2703 sdmfr.put_root_hash(self.root_hash)
2706 sdmfr.put_signature(self.signature)
2708 # Put the verification key
2709 sdmfr.put_verification_key(self.verification_key)
2711 # Now check to make sure that nothing has been written yet.
2712 self.failUnlessEqual(self.rref.write_count, 0)
2714 # Now finish publishing
2715 d = sdmfr.finish_publishing()
2717 self.failUnlessEqual(self.rref.write_count, 1)
2718 read = self.ss.remote_slot_readv
2719 self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2721 d.addCallback(_then)
2725 def test_sdmf_writer_preexisting_share(self):
2726 data = self.build_test_sdmf_share()
2727 self.write_sdmf_share_to_server("si1")
2729 # Now there is a share on the storage server. To successfully
2730 # write, we need to set the checkstring correctly. When we
2731 # don't, no write should occur.
2732 sdmfw = SDMFSlotWriteProxy(0,
2737 sdmfw.put_block(self.blockdata, 0, self.salt)
2739 # Put the encprivkey
2740 sdmfw.put_encprivkey(self.encprivkey)
2742 # Put the block and share hash chains
2743 sdmfw.put_blockhashes(self.block_hash_tree)
2744 sdmfw.put_sharehashes(self.share_hash_chain)
2747 sdmfw.put_root_hash(self.root_hash)
2750 sdmfw.put_signature(self.signature)
2752 # Put the verification key
2753 sdmfw.put_verification_key(self.verification_key)
2755 # We shouldn't have a checkstring yet
2756 self.failUnlessEqual(sdmfw.get_checkstring(), "")
2758 d = sdmfw.finish_publishing()
2760 self.failIf(results[0])
2761 # this is the correct checkstring
2762 self._expected_checkstring = results[1][0][0]
2763 return self._expected_checkstring
2765 d.addCallback(_then)
2766 d.addCallback(sdmfw.set_checkstring)
2767 d.addCallback(lambda ignored:
2768 sdmfw.get_checkstring())
2769 d.addCallback(lambda checkstring:
2770 self.failUnlessEqual(checkstring, self._expected_checkstring))
2771 d.addCallback(lambda ignored:
2772 sdmfw.finish_publishing())
2773 def _then_again(results):
2774 self.failUnless(results[0])
2775 read = self.ss.remote_slot_readv
2776 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2777 {0: [struct.pack(">Q", 1)]})
2778 self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2780 d.addCallback(_then_again)
2784 class Stats(unittest.TestCase):
2787 self.sparent = LoggingServiceParent()
2788 self._lease_secret = itertools.count()
2790 return self.sparent.stopService()
2792 def workdir(self, name):
2793 basedir = os.path.join("storage", "Server", name)
2796 def create(self, name):
2797 workdir = self.workdir(name)
2798 ss = StorageServer(workdir, "\x00" * 20)
2799 ss.setServiceParent(self.sparent)
2802 def test_latencies(self):
2803 ss = self.create("test_latencies")
2804 for i in range(10000):
2805 ss.add_latency("allocate", 1.0 * i)
2806 for i in range(1000):
2807 ss.add_latency("renew", 1.0 * i)
2809 ss.add_latency("write", 1.0 * i)
2811 ss.add_latency("cancel", 2.0 * i)
2812 ss.add_latency("get", 5.0)
2814 output = ss.get_latencies()
2816 self.failUnlessEqual(sorted(output.keys()),
2817 sorted(["allocate", "renew", "cancel", "write", "get"]))
2818 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2819 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2820 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2821 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2822 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2823 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2824 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2825 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2826 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2828 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2829 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2830 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
2831 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2832 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2833 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2834 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2835 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2836 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2838 self.failUnlessEqual(len(ss.latencies["write"]), 20)
2839 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2840 self.failUnless(output["write"]["01_0_percentile"] is None, output)
2841 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
2842 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2843 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2844 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2845 self.failUnless(output["write"]["99_0_percentile"] is None, output)
2846 self.failUnless(output["write"]["99_9_percentile"] is None, output)
2848 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2849 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2850 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2851 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
2852 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2853 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2854 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2855 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2856 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2858 self.failUnlessEqual(len(ss.latencies["get"]), 1)
2859 self.failUnless(output["get"]["mean"] is None, output)
2860 self.failUnless(output["get"]["01_0_percentile"] is None, output)
2861 self.failUnless(output["get"]["10_0_percentile"] is None, output)
2862 self.failUnless(output["get"]["50_0_percentile"] is None, output)
2863 self.failUnless(output["get"]["90_0_percentile"] is None, output)
2864 self.failUnless(output["get"]["95_0_percentile"] is None, output)
2865 self.failUnless(output["get"]["99_0_percentile"] is None, output)
2866 self.failUnless(output["get"]["99_9_percentile"] is None, output)
2869 s = re.sub(r'<[^>]*>', ' ', s)
2870 s = re.sub(r'\s+', ' ', s)
2873 class MyBucketCountingCrawler(BucketCountingCrawler):
2874 def finished_prefix(self, cycle, prefix):
2875 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2877 d = self.hook_ds.pop(0)
2880 class MyStorageServer(StorageServer):
2881 def add_bucket_counter(self):
2882 statefile = os.path.join(self.storedir, "bucket_counter.state")
2883 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2884 self.bucket_counter.setServiceParent(self)
2886 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2889 self.s = service.MultiService()
2890 self.s.startService()
2892 return self.s.stopService()
2894 def test_bucket_counter(self):
2895 basedir = "storage/BucketCounter/bucket_counter"
2896 fileutil.make_dirs(basedir)
2897 ss = StorageServer(basedir, "\x00" * 20)
2898 # to make sure we capture the bucket-counting-crawler in the middle
2899 # of a cycle, we reach in and reduce its maximum slice time to 0. We
2900 # also make it start sooner than usual.
2901 ss.bucket_counter.slow_start = 0
2902 orig_cpu_slice = ss.bucket_counter.cpu_slice
2903 ss.bucket_counter.cpu_slice = 0
2904 ss.setServiceParent(self.s)
2906 w = StorageStatus(ss)
2908 # this sample is before the crawler has started doing anything
2909 html = w.renderSynchronously()
2910 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2911 s = remove_tags(html)
2912 self.failUnlessIn("Accepting new shares: Yes", s)
2913 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2914 self.failUnlessIn("Total buckets: Not computed yet", s)
2915 self.failUnlessIn("Next crawl in", s)
2917 # give the bucket-counting-crawler one tick to get started. The
2918 # cpu_slice=0 will force it to yield right after it processes the
2921 d = fireEventually()
2922 def _check(ignored):
2923 # are we really right after the first prefix?
2924 state = ss.bucket_counter.get_state()
2925 if state["last-complete-prefix"] is None:
2926 d2 = fireEventually()
2927 d2.addCallback(_check)
2929 self.failUnlessEqual(state["last-complete-prefix"],
2930 ss.bucket_counter.prefixes[0])
2931 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2932 html = w.renderSynchronously()
2933 s = remove_tags(html)
2934 self.failUnlessIn(" Current crawl ", s)
2935 self.failUnlessIn(" (next work in ", s)
2936 d.addCallback(_check)
2938 # now give it enough time to complete a full cycle
2940 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2941 d.addCallback(lambda ignored: self.poll(_watch))
2942 def _check2(ignored):
2943 ss.bucket_counter.cpu_slice = orig_cpu_slice
2944 html = w.renderSynchronously()
2945 s = remove_tags(html)
2946 self.failUnlessIn("Total buckets: 0 (the number of", s)
2947 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2948 d.addCallback(_check2)
2951 def test_bucket_counter_cleanup(self):
2952 basedir = "storage/BucketCounter/bucket_counter_cleanup"
2953 fileutil.make_dirs(basedir)
2954 ss = StorageServer(basedir, "\x00" * 20)
2955 # to make sure we capture the bucket-counting-crawler in the middle
2956 # of a cycle, we reach in and reduce its maximum slice time to 0.
2957 ss.bucket_counter.slow_start = 0
2958 orig_cpu_slice = ss.bucket_counter.cpu_slice
2959 ss.bucket_counter.cpu_slice = 0
2960 ss.setServiceParent(self.s)
2962 d = fireEventually()
2964 def _after_first_prefix(ignored):
2965 state = ss.bucket_counter.state
2966 if state["last-complete-prefix"] is None:
2967 d2 = fireEventually()
2968 d2.addCallback(_after_first_prefix)
2970 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2971 # now sneak in and mess with its state, to make sure it cleans up
2972 # properly at the end of the cycle
2973 self.failUnlessEqual(state["last-complete-prefix"],
2974 ss.bucket_counter.prefixes[0])
2975 state["bucket-counts"][-12] = {}
2976 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2977 ss.bucket_counter.save_state()
2978 d.addCallback(_after_first_prefix)
2980 # now give it enough time to complete a cycle
2982 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2983 d.addCallback(lambda ignored: self.poll(_watch))
2984 def _check2(ignored):
2985 ss.bucket_counter.cpu_slice = orig_cpu_slice
2986 s = ss.bucket_counter.get_state()
2987 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2988 self.failIf("bogusprefix!" in s["storage-index-samples"],
2989 s["storage-index-samples"].keys())
2990 d.addCallback(_check2)
2993 def test_bucket_counter_eta(self):
2994 basedir = "storage/BucketCounter/bucket_counter_eta"
2995 fileutil.make_dirs(basedir)
2996 ss = MyStorageServer(basedir, "\x00" * 20)
2997 ss.bucket_counter.slow_start = 0
2998 # these will be fired inside finished_prefix()
2999 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
3000 w = StorageStatus(ss)
3002 d = defer.Deferred()
3004 def _check_1(ignored):
3005 # no ETA is available yet
3006 html = w.renderSynchronously()
3007 s = remove_tags(html)
3008 self.failUnlessIn("complete (next work", s)
3010 def _check_2(ignored):
3011 # one prefix has finished, so an ETA based upon that elapsed time
3012 # should be available.
3013 html = w.renderSynchronously()
3014 s = remove_tags(html)
3015 self.failUnlessIn("complete (ETA ", s)
3017 def _check_3(ignored):
3018 # two prefixes have finished
3019 html = w.renderSynchronously()
3020 s = remove_tags(html)
3021 self.failUnlessIn("complete (ETA ", s)
3024 hooks[0].addCallback(_check_1).addErrback(d.errback)
3025 hooks[1].addCallback(_check_2).addErrback(d.errback)
3026 hooks[2].addCallback(_check_3).addErrback(d.errback)
3028 ss.setServiceParent(self.s)
3031 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3032 stop_after_first_bucket = False
3033 def process_bucket(self, *args, **kwargs):
3034 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3035 if self.stop_after_first_bucket:
3036 self.stop_after_first_bucket = False
3037 self.cpu_slice = -1.0
3038 def yielding(self, sleep_time):
3039 if not self.stop_after_first_bucket:
3040 self.cpu_slice = 500
3042 class BrokenStatResults:
3044 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3047 bsr = BrokenStatResults()
3048 for attrname in dir(s):
3049 if attrname.startswith("_"):
3051 if attrname == "st_blocks":
3053 setattr(bsr, attrname, getattr(s, attrname))
3056 class InstrumentedStorageServer(StorageServer):
3057 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3058 class No_ST_BLOCKS_StorageServer(StorageServer):
3059 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3061 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3064 self.s = service.MultiService()
3065 self.s.startService()
3067 return self.s.stopService()
3069 def make_shares(self, ss):
3071 return (si, hashutil.tagged_hash("renew", si),
3072 hashutil.tagged_hash("cancel", si))
3073 def make_mutable(si):
3074 return (si, hashutil.tagged_hash("renew", si),
3075 hashutil.tagged_hash("cancel", si),
3076 hashutil.tagged_hash("write-enabler", si))
3077 def make_extra_lease(si, num):
3078 return (hashutil.tagged_hash("renew-%d" % num, si),
3079 hashutil.tagged_hash("cancel-%d" % num, si))
3081 immutable_si_0, rs0, cs0 = make("\x00" * 16)
3082 immutable_si_1, rs1, cs1 = make("\x01" * 16)
3083 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3084 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3085 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3086 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3088 canary = FakeCanary()
3089 # note: 'tahoe debug dump-share' will not handle this file, since the
3090 # inner contents are not a valid CHK share
3091 data = "\xff" * 1000
3093 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3095 w[0].remote_write(0, data)
3098 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3100 w[0].remote_write(0, data)
3102 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3104 writev = ss.remote_slot_testv_and_readv_and_writev
3105 writev(mutable_si_2, (we2, rs2, cs2),
3106 {0: ([], [(0,data)], len(data))}, [])
3107 writev(mutable_si_3, (we3, rs3, cs3),
3108 {0: ([], [(0,data)], len(data))}, [])
3109 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3111 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3112 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3113 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3115 def test_basic(self):
3116 basedir = "storage/LeaseCrawler/basic"
3117 fileutil.make_dirs(basedir)
3118 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3119 # make it start sooner than usual.
3120 lc = ss.lease_checker
3123 lc.stop_after_first_bucket = True
3124 webstatus = StorageStatus(ss)
3126 # create a few shares, with some leases on them
3127 self.make_shares(ss)
3128 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3130 # add a non-sharefile to exercise another code path
3131 fn = os.path.join(ss.sharedir,
3132 storage_index_to_dir(immutable_si_0),
3135 f.write("I am not a share.\n")
3138 # this is before the crawl has started, so we're not in a cycle yet
3139 initial_state = lc.get_state()
3140 self.failIf(lc.get_progress()["cycle-in-progress"])
3141 self.failIfIn("cycle-to-date", initial_state)
3142 self.failIfIn("estimated-remaining-cycle", initial_state)
3143 self.failIfIn("estimated-current-cycle", initial_state)
3144 self.failUnlessIn("history", initial_state)
3145 self.failUnlessEqual(initial_state["history"], {})
3147 ss.setServiceParent(self.s)
3151 d = fireEventually()
3153 # now examine the state right after the first bucket has been
3155 def _after_first_bucket(ignored):
3156 initial_state = lc.get_state()
3157 if "cycle-to-date" not in initial_state:
3158 d2 = fireEventually()
3159 d2.addCallback(_after_first_bucket)
3161 self.failUnlessIn("cycle-to-date", initial_state)
3162 self.failUnlessIn("estimated-remaining-cycle", initial_state)
3163 self.failUnlessIn("estimated-current-cycle", initial_state)
3164 self.failUnlessIn("history", initial_state)
3165 self.failUnlessEqual(initial_state["history"], {})
3167 so_far = initial_state["cycle-to-date"]
3168 self.failUnlessEqual(so_far["expiration-enabled"], False)
3169 self.failUnlessIn("configured-expiration-mode", so_far)
3170 self.failUnlessIn("lease-age-histogram", so_far)
3171 lah = so_far["lease-age-histogram"]
3172 self.failUnlessEqual(type(lah), list)
3173 self.failUnlessEqual(len(lah), 1)
3174 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3175 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3176 self.failUnlessEqual(so_far["corrupt-shares"], [])
3177 sr1 = so_far["space-recovered"]
3178 self.failUnlessEqual(sr1["examined-buckets"], 1)
3179 self.failUnlessEqual(sr1["examined-shares"], 1)
3180 self.failUnlessEqual(sr1["actual-shares"], 0)
3181 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3182 self.failUnlessEqual(sr1["original-sharebytes"], 0)
3183 left = initial_state["estimated-remaining-cycle"]
3184 sr2 = left["space-recovered"]
3185 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3186 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3187 self.failIfEqual(sr2["actual-shares"], None)
3188 self.failIfEqual(sr2["configured-diskbytes"], None)
3189 self.failIfEqual(sr2["original-sharebytes"], None)
3190 d.addCallback(_after_first_bucket)
3191 d.addCallback(lambda ign: self.render1(webstatus))
3192 def _check_html_in_cycle(html):
3193 s = remove_tags(html)
3194 self.failUnlessIn("So far, this cycle has examined "
3195 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3196 self.failUnlessIn("and has recovered: "
3197 "0 shares, 0 buckets (0 mutable / 0 immutable), "
3198 "0 B (0 B / 0 B)", s)
3199 self.failUnlessIn("If expiration were enabled, "
3200 "we would have recovered: "
3201 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3202 " 0 B (0 B / 0 B) by now", s)
3203 self.failUnlessIn("and the remainder of this cycle "
3204 "would probably recover: "
3205 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3206 " 0 B (0 B / 0 B)", s)
3207 self.failUnlessIn("and the whole cycle would probably recover: "
3208 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3209 " 0 B (0 B / 0 B)", s)
3210 self.failUnlessIn("if we were strictly using each lease's default "
3211 "31-day lease lifetime", s)
3212 self.failUnlessIn("this cycle would be expected to recover: ", s)
3213 d.addCallback(_check_html_in_cycle)
3215 # wait for the crawler to finish the first cycle. Nothing should have
3218 return bool(lc.get_state()["last-cycle-finished"] is not None)
3219 d.addCallback(lambda ign: self.poll(_wait))
3221 def _after_first_cycle(ignored):
3223 self.failIf("cycle-to-date" in s)
3224 self.failIf("estimated-remaining-cycle" in s)
3225 self.failIf("estimated-current-cycle" in s)
3226 last = s["history"][0]
3227 self.failUnlessIn("cycle-start-finish-times", last)
3228 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3229 self.failUnlessEqual(last["expiration-enabled"], False)
3230 self.failUnlessIn("configured-expiration-mode", last)
3232 self.failUnlessIn("lease-age-histogram", last)
3233 lah = last["lease-age-histogram"]
3234 self.failUnlessEqual(type(lah), list)
3235 self.failUnlessEqual(len(lah), 1)
3236 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3238 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3239 self.failUnlessEqual(last["corrupt-shares"], [])
3241 rec = last["space-recovered"]
3242 self.failUnlessEqual(rec["examined-buckets"], 4)
3243 self.failUnlessEqual(rec["examined-shares"], 4)
3244 self.failUnlessEqual(rec["actual-buckets"], 0)
3245 self.failUnlessEqual(rec["original-buckets"], 0)
3246 self.failUnlessEqual(rec["configured-buckets"], 0)
3247 self.failUnlessEqual(rec["actual-shares"], 0)
3248 self.failUnlessEqual(rec["original-shares"], 0)
3249 self.failUnlessEqual(rec["configured-shares"], 0)
3250 self.failUnlessEqual(rec["actual-diskbytes"], 0)
3251 self.failUnlessEqual(rec["original-diskbytes"], 0)
3252 self.failUnlessEqual(rec["configured-diskbytes"], 0)
3253 self.failUnlessEqual(rec["actual-sharebytes"], 0)
3254 self.failUnlessEqual(rec["original-sharebytes"], 0)
3255 self.failUnlessEqual(rec["configured-sharebytes"], 0)
3257 def _get_sharefile(si):
3258 return list(ss._iter_share_files(si))[0]
3259 def count_leases(si):
3260 return len(list(_get_sharefile(si).get_leases()))
3261 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3262 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3263 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3264 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3265 d.addCallback(_after_first_cycle)
3266 d.addCallback(lambda ign: self.render1(webstatus))
3267 def _check_html(html):
3268 s = remove_tags(html)
3269 self.failUnlessIn("recovered: 0 shares, 0 buckets "
3270 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3271 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3272 "(2 mutable / 2 immutable),", s)
3273 self.failUnlessIn("but expiration was not enabled", s)
3274 d.addCallback(_check_html)
3275 d.addCallback(lambda ign: self.render_json(webstatus))
3276 def _check_json(json):
3277 data = simplejson.loads(json)
3278 self.failUnlessIn("lease-checker", data)
3279 self.failUnlessIn("lease-checker-progress", data)
3280 d.addCallback(_check_json)
3283 def backdate_lease(self, sf, renew_secret, new_expire_time):
3284 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3285 # "renew" a lease with a new_expire_time that is older than what the
3286 # current lease has), so we have to reach inside it.
3287 for i,lease in enumerate(sf.get_leases()):
3288 if lease.renew_secret == renew_secret:
3289 lease.expiration_time = new_expire_time
3290 f = open(sf.home, 'rb+')
3291 sf._write_lease_record(f, i, lease)
3294 raise IndexError("unable to renew non-existent lease")
3296 def test_expire_age(self):
3297 basedir = "storage/LeaseCrawler/expire_age"
3298 fileutil.make_dirs(basedir)
3299 # setting expiration_time to 2000 means that any lease which is more
3300 # than 2000s old will be expired.
3301 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3302 expiration_enabled=True,
3303 expiration_mode="age",
3304 expiration_override_lease_duration=2000)
3305 # make it start sooner than usual.
3306 lc = ss.lease_checker
3308 lc.stop_after_first_bucket = True
3309 webstatus = StorageStatus(ss)
3311 # create a few shares, with some leases on them
3312 self.make_shares(ss)
3313 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3315 def count_shares(si):
3316 return len(list(ss._iter_share_files(si)))
3317 def _get_sharefile(si):
3318 return list(ss._iter_share_files(si))[0]
3319 def count_leases(si):
3320 return len(list(_get_sharefile(si).get_leases()))
3322 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3323 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3324 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3325 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3326 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3327 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3328 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3329 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3331 # artificially crank back the expiration time on the first lease of
3332 # each share, to make it look like it expired already (age=1000s).
3333 # Some shares have an extra lease which is set to expire at the
3334 # default time in 31 days from now (age=31days). We then run the
3335 # crawler, which will expire the first lease, making some shares get
3336 # deleted and others stay alive (with one remaining lease)
3339 sf0 = _get_sharefile(immutable_si_0)
3340 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3341 sf0_size = os.stat(sf0.home).st_size
3343 # immutable_si_1 gets an extra lease
3344 sf1 = _get_sharefile(immutable_si_1)
3345 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3347 sf2 = _get_sharefile(mutable_si_2)
3348 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3349 sf2_size = os.stat(sf2.home).st_size
3351 # mutable_si_3 gets an extra lease
3352 sf3 = _get_sharefile(mutable_si_3)
3353 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3355 ss.setServiceParent(self.s)
3357 d = fireEventually()
3358 # examine the state right after the first bucket has been processed
3359 def _after_first_bucket(ignored):
3360 p = lc.get_progress()
3361 if not p["cycle-in-progress"]:
3362 d2 = fireEventually()
3363 d2.addCallback(_after_first_bucket)
3365 d.addCallback(_after_first_bucket)
3366 d.addCallback(lambda ign: self.render1(webstatus))
3367 def _check_html_in_cycle(html):
3368 s = remove_tags(html)
3369 # the first bucket encountered gets deleted, and its prefix
3370 # happens to be about 1/5th of the way through the ring, so the
3371 # predictor thinks we'll have 5 shares and that we'll delete them
3372 # all. This part of the test depends upon the SIs landing right
3373 # where they do now.
3374 self.failUnlessIn("The remainder of this cycle is expected to "
3375 "recover: 4 shares, 4 buckets", s)
3376 self.failUnlessIn("The whole cycle is expected to examine "
3377 "5 shares in 5 buckets and to recover: "
3378 "5 shares, 5 buckets", s)
3379 d.addCallback(_check_html_in_cycle)
3381 # wait for the crawler to finish the first cycle. Two shares should
3384 return bool(lc.get_state()["last-cycle-finished"] is not None)
3385 d.addCallback(lambda ign: self.poll(_wait))
3387 def _after_first_cycle(ignored):
3388 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3389 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3390 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3391 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3392 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3393 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3396 last = s["history"][0]
3398 self.failUnlessEqual(last["expiration-enabled"], True)
3399 self.failUnlessEqual(last["configured-expiration-mode"],
3400 ("age", 2000, None, ("mutable", "immutable")))
3401 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3403 rec = last["space-recovered"]
3404 self.failUnlessEqual(rec["examined-buckets"], 4)
3405 self.failUnlessEqual(rec["examined-shares"], 4)
3406 self.failUnlessEqual(rec["actual-buckets"], 2)
3407 self.failUnlessEqual(rec["original-buckets"], 2)
3408 self.failUnlessEqual(rec["configured-buckets"], 2)
3409 self.failUnlessEqual(rec["actual-shares"], 2)
3410 self.failUnlessEqual(rec["original-shares"], 2)
3411 self.failUnlessEqual(rec["configured-shares"], 2)
3412 size = sf0_size + sf2_size
3413 self.failUnlessEqual(rec["actual-sharebytes"], size)
3414 self.failUnlessEqual(rec["original-sharebytes"], size)
3415 self.failUnlessEqual(rec["configured-sharebytes"], size)
3416 # different platforms have different notions of "blocks used by
3417 # this file", so merely assert that it's a number
3418 self.failUnless(rec["actual-diskbytes"] >= 0,
3419 rec["actual-diskbytes"])
3420 self.failUnless(rec["original-diskbytes"] >= 0,
3421 rec["original-diskbytes"])
3422 self.failUnless(rec["configured-diskbytes"] >= 0,
3423 rec["configured-diskbytes"])
3424 d.addCallback(_after_first_cycle)
3425 d.addCallback(lambda ign: self.render1(webstatus))
3426 def _check_html(html):
3427 s = remove_tags(html)
3428 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3429 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3430 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3431 d.addCallback(_check_html)
3434 def test_expire_cutoff_date(self):
3435 basedir = "storage/LeaseCrawler/expire_cutoff_date"
3436 fileutil.make_dirs(basedir)
3437 # setting cutoff-date to 2000 seconds ago means that any lease which
3438 # is more than 2000s old will be expired.
3440 then = int(now - 2000)
3441 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3442 expiration_enabled=True,
3443 expiration_mode="cutoff-date",
3444 expiration_cutoff_date=then)
3445 # make it start sooner than usual.
3446 lc = ss.lease_checker
3448 lc.stop_after_first_bucket = True
3449 webstatus = StorageStatus(ss)
3451 # create a few shares, with some leases on them
3452 self.make_shares(ss)
3453 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3455 def count_shares(si):
3456 return len(list(ss._iter_share_files(si)))
3457 def _get_sharefile(si):
3458 return list(ss._iter_share_files(si))[0]
3459 def count_leases(si):
3460 return len(list(_get_sharefile(si).get_leases()))
3462 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3463 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3464 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3465 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3466 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3467 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3468 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3469 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3471 # artificially crank back the expiration time on the first lease of
3472 # each share, to make it look like was renewed 3000s ago. To achieve
3473 # this, we need to set the expiration time to now-3000+31days. This
3474 # will change when the lease format is improved to contain both
3475 # create/renew time and duration.
3476 new_expiration_time = now - 3000 + 31*24*60*60
3478 # Some shares have an extra lease which is set to expire at the
3479 # default time in 31 days from now (age=31days). We then run the
3480 # crawler, which will expire the first lease, making some shares get
3481 # deleted and others stay alive (with one remaining lease)
3483 sf0 = _get_sharefile(immutable_si_0)
3484 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3485 sf0_size = os.stat(sf0.home).st_size
3487 # immutable_si_1 gets an extra lease
3488 sf1 = _get_sharefile(immutable_si_1)
3489 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3491 sf2 = _get_sharefile(mutable_si_2)
3492 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3493 sf2_size = os.stat(sf2.home).st_size
3495 # mutable_si_3 gets an extra lease
3496 sf3 = _get_sharefile(mutable_si_3)
3497 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3499 ss.setServiceParent(self.s)
3501 d = fireEventually()
3502 # examine the state right after the first bucket has been processed
3503 def _after_first_bucket(ignored):
3504 p = lc.get_progress()
3505 if not p["cycle-in-progress"]:
3506 d2 = fireEventually()
3507 d2.addCallback(_after_first_bucket)
3509 d.addCallback(_after_first_bucket)
3510 d.addCallback(lambda ign: self.render1(webstatus))
3511 def _check_html_in_cycle(html):
3512 s = remove_tags(html)
3513 # the first bucket encountered gets deleted, and its prefix
3514 # happens to be about 1/5th of the way through the ring, so the
3515 # predictor thinks we'll have 5 shares and that we'll delete them
3516 # all. This part of the test depends upon the SIs landing right
3517 # where they do now.
3518 self.failUnlessIn("The remainder of this cycle is expected to "
3519 "recover: 4 shares, 4 buckets", s)
3520 self.failUnlessIn("The whole cycle is expected to examine "
3521 "5 shares in 5 buckets and to recover: "
3522 "5 shares, 5 buckets", s)
3523 d.addCallback(_check_html_in_cycle)
3525 # wait for the crawler to finish the first cycle. Two shares should
3528 return bool(lc.get_state()["last-cycle-finished"] is not None)
3529 d.addCallback(lambda ign: self.poll(_wait))
3531 def _after_first_cycle(ignored):
3532 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3533 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3534 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3535 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3536 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3537 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3540 last = s["history"][0]
3542 self.failUnlessEqual(last["expiration-enabled"], True)
3543 self.failUnlessEqual(last["configured-expiration-mode"],
3544 ("cutoff-date", None, then,
3545 ("mutable", "immutable")))
3546 self.failUnlessEqual(last["leases-per-share-histogram"],
3549 rec = last["space-recovered"]
3550 self.failUnlessEqual(rec["examined-buckets"], 4)
3551 self.failUnlessEqual(rec["examined-shares"], 4)
3552 self.failUnlessEqual(rec["actual-buckets"], 2)
3553 self.failUnlessEqual(rec["original-buckets"], 0)
3554 self.failUnlessEqual(rec["configured-buckets"], 2)
3555 self.failUnlessEqual(rec["actual-shares"], 2)
3556 self.failUnlessEqual(rec["original-shares"], 0)
3557 self.failUnlessEqual(rec["configured-shares"], 2)
3558 size = sf0_size + sf2_size
3559 self.failUnlessEqual(rec["actual-sharebytes"], size)
3560 self.failUnlessEqual(rec["original-sharebytes"], 0)
3561 self.failUnlessEqual(rec["configured-sharebytes"], size)
3562 # different platforms have different notions of "blocks used by
3563 # this file", so merely assert that it's a number
3564 self.failUnless(rec["actual-diskbytes"] >= 0,
3565 rec["actual-diskbytes"])
3566 self.failUnless(rec["original-diskbytes"] >= 0,
3567 rec["original-diskbytes"])
3568 self.failUnless(rec["configured-diskbytes"] >= 0,
3569 rec["configured-diskbytes"])
3570 d.addCallback(_after_first_cycle)
3571 d.addCallback(lambda ign: self.render1(webstatus))
3572 def _check_html(html):
3573 s = remove_tags(html)
3574 self.failUnlessIn("Expiration Enabled:"
3575 " expired leases will be removed", s)
3576 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3577 substr = "Leases created or last renewed before %s will be considered expired." % date
3578 self.failUnlessIn(substr, s)
3579 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3580 d.addCallback(_check_html)
3583 def test_only_immutable(self):
3584 basedir = "storage/LeaseCrawler/only_immutable"
3585 fileutil.make_dirs(basedir)
3587 then = int(now - 2000)
3588 ss = StorageServer(basedir, "\x00" * 20,
3589 expiration_enabled=True,
3590 expiration_mode="cutoff-date",
3591 expiration_cutoff_date=then,
3592 expiration_sharetypes=("immutable",))
3593 lc = ss.lease_checker
3595 webstatus = StorageStatus(ss)
3597 self.make_shares(ss)
3598 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3599 # set all leases to be expirable
3600 new_expiration_time = now - 3000 + 31*24*60*60
3602 def count_shares(si):
3603 return len(list(ss._iter_share_files(si)))
3604 def _get_sharefile(si):
3605 return list(ss._iter_share_files(si))[0]
3606 def count_leases(si):
3607 return len(list(_get_sharefile(si).get_leases()))
3609 sf0 = _get_sharefile(immutable_si_0)
3610 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3611 sf1 = _get_sharefile(immutable_si_1)
3612 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3613 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3614 sf2 = _get_sharefile(mutable_si_2)
3615 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3616 sf3 = _get_sharefile(mutable_si_3)
3617 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3618 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3620 ss.setServiceParent(self.s)
3622 return bool(lc.get_state()["last-cycle-finished"] is not None)
3623 d = self.poll(_wait)
3625 def _after_first_cycle(ignored):
3626 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3627 self.failUnlessEqual(count_shares(immutable_si_1), 0)
3628 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3629 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3630 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3631 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3632 d.addCallback(_after_first_cycle)
3633 d.addCallback(lambda ign: self.render1(webstatus))
3634 def _check_html(html):
3635 s = remove_tags(html)
3636 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3637 d.addCallback(_check_html)
3640 def test_only_mutable(self):
3641 basedir = "storage/LeaseCrawler/only_mutable"
3642 fileutil.make_dirs(basedir)
3644 then = int(now - 2000)
3645 ss = StorageServer(basedir, "\x00" * 20,
3646 expiration_enabled=True,
3647 expiration_mode="cutoff-date",
3648 expiration_cutoff_date=then,
3649 expiration_sharetypes=("mutable",))
3650 lc = ss.lease_checker
3652 webstatus = StorageStatus(ss)
3654 self.make_shares(ss)
3655 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3656 # set all leases to be expirable
3657 new_expiration_time = now - 3000 + 31*24*60*60
3659 def count_shares(si):
3660 return len(list(ss._iter_share_files(si)))
3661 def _get_sharefile(si):
3662 return list(ss._iter_share_files(si))[0]
3663 def count_leases(si):
3664 return len(list(_get_sharefile(si).get_leases()))
3666 sf0 = _get_sharefile(immutable_si_0)
3667 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3668 sf1 = _get_sharefile(immutable_si_1)
3669 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3670 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3671 sf2 = _get_sharefile(mutable_si_2)
3672 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3673 sf3 = _get_sharefile(mutable_si_3)
3674 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3675 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3677 ss.setServiceParent(self.s)
3679 return bool(lc.get_state()["last-cycle-finished"] is not None)
3680 d = self.poll(_wait)
3682 def _after_first_cycle(ignored):
3683 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3684 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3685 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3686 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3687 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3688 self.failUnlessEqual(count_shares(mutable_si_3), 0)
3689 d.addCallback(_after_first_cycle)
3690 d.addCallback(lambda ign: self.render1(webstatus))
3691 def _check_html(html):
3692 s = remove_tags(html)
3693 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3694 d.addCallback(_check_html)
3697 def test_bad_mode(self):
3698 basedir = "storage/LeaseCrawler/bad_mode"
3699 fileutil.make_dirs(basedir)
3700 e = self.failUnlessRaises(ValueError,
3701 StorageServer, basedir, "\x00" * 20,
3702 expiration_mode="bogus")
3703 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3705 def test_parse_duration(self):
3709 p = time_format.parse_duration
3710 self.failUnlessEqual(p("7days"), 7*DAY)
3711 self.failUnlessEqual(p("31day"), 31*DAY)
3712 self.failUnlessEqual(p("60 days"), 60*DAY)
3713 self.failUnlessEqual(p("2mo"), 2*MONTH)
3714 self.failUnlessEqual(p("3 month"), 3*MONTH)
3715 self.failUnlessEqual(p("2years"), 2*YEAR)
3716 e = self.failUnlessRaises(ValueError, p, "2kumquats")
3717 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3719 def test_parse_date(self):
3720 p = time_format.parse_date
3721 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3722 self.failUnlessEqual(p("2009-03-18"), 1237334400)
3724 def test_limited_history(self):
3725 basedir = "storage/LeaseCrawler/limited_history"
3726 fileutil.make_dirs(basedir)
3727 ss = StorageServer(basedir, "\x00" * 20)
3728 # make it start sooner than usual.
3729 lc = ss.lease_checker
3733 # create a few shares, with some leases on them
3734 self.make_shares(ss)
3736 ss.setServiceParent(self.s)
3738 def _wait_until_15_cycles_done():
3739 last = lc.state["last-cycle-finished"]
3740 if last is not None and last >= 15:
3745 d = self.poll(_wait_until_15_cycles_done)
3747 def _check(ignored):
3750 self.failUnlessEqual(len(h), 10)
3751 self.failUnlessEqual(max(h.keys()), 15)
3752 self.failUnlessEqual(min(h.keys()), 6)
3753 d.addCallback(_check)
3756 def test_unpredictable_future(self):
3757 basedir = "storage/LeaseCrawler/unpredictable_future"
3758 fileutil.make_dirs(basedir)
3759 ss = StorageServer(basedir, "\x00" * 20)
3760 # make it start sooner than usual.
3761 lc = ss.lease_checker
3763 lc.cpu_slice = -1.0 # stop quickly
3765 self.make_shares(ss)
3767 ss.setServiceParent(self.s)
3769 d = fireEventually()
3770 def _check(ignored):
3771 # this should fire after the first bucket is complete, but before
3772 # the first prefix is complete, so the progress-measurer won't
3773 # think we've gotten far enough to raise our percent-complete
3774 # above 0%, triggering the cannot-predict-the-future code in
3775 # expirer.py . This will have to change if/when the
3776 # progress-measurer gets smart enough to count buckets (we'll
3777 # have to interrupt it even earlier, before it's finished the
3780 if "cycle-to-date" not in s:
3781 d2 = fireEventually()
3782 d2.addCallback(_check)
3784 self.failUnlessIn("cycle-to-date", s)
3785 self.failUnlessIn("estimated-remaining-cycle", s)
3786 self.failUnlessIn("estimated-current-cycle", s)
3788 left = s["estimated-remaining-cycle"]["space-recovered"]
3789 self.failUnlessEqual(left["actual-buckets"], None)
3790 self.failUnlessEqual(left["original-buckets"], None)
3791 self.failUnlessEqual(left["configured-buckets"], None)
3792 self.failUnlessEqual(left["actual-shares"], None)
3793 self.failUnlessEqual(left["original-shares"], None)
3794 self.failUnlessEqual(left["configured-shares"], None)
3795 self.failUnlessEqual(left["actual-diskbytes"], None)
3796 self.failUnlessEqual(left["original-diskbytes"], None)
3797 self.failUnlessEqual(left["configured-diskbytes"], None)
3798 self.failUnlessEqual(left["actual-sharebytes"], None)
3799 self.failUnlessEqual(left["original-sharebytes"], None)
3800 self.failUnlessEqual(left["configured-sharebytes"], None)
3802 full = s["estimated-remaining-cycle"]["space-recovered"]
3803 self.failUnlessEqual(full["actual-buckets"], None)
3804 self.failUnlessEqual(full["original-buckets"], None)
3805 self.failUnlessEqual(full["configured-buckets"], None)
3806 self.failUnlessEqual(full["actual-shares"], None)
3807 self.failUnlessEqual(full["original-shares"], None)
3808 self.failUnlessEqual(full["configured-shares"], None)
3809 self.failUnlessEqual(full["actual-diskbytes"], None)
3810 self.failUnlessEqual(full["original-diskbytes"], None)
3811 self.failUnlessEqual(full["configured-diskbytes"], None)
3812 self.failUnlessEqual(full["actual-sharebytes"], None)
3813 self.failUnlessEqual(full["original-sharebytes"], None)
3814 self.failUnlessEqual(full["configured-sharebytes"], None)
3816 d.addCallback(_check)
3819 def test_no_st_blocks(self):
3820 basedir = "storage/LeaseCrawler/no_st_blocks"
3821 fileutil.make_dirs(basedir)
3822 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3823 expiration_mode="age",
3824 expiration_override_lease_duration=-1000)
3825 # a negative expiration_time= means the "configured-"
3826 # space-recovered counts will be non-zero, since all shares will have
3829 # make it start sooner than usual.
3830 lc = ss.lease_checker
3833 self.make_shares(ss)
3834 ss.setServiceParent(self.s)
3836 return bool(lc.get_state()["last-cycle-finished"] is not None)
3837 d = self.poll(_wait)
3839 def _check(ignored):
3841 last = s["history"][0]
3842 rec = last["space-recovered"]
3843 self.failUnlessEqual(rec["configured-buckets"], 4)
3844 self.failUnlessEqual(rec["configured-shares"], 4)
3845 self.failUnless(rec["configured-sharebytes"] > 0,
3846 rec["configured-sharebytes"])
3847 # without the .st_blocks field in os.stat() results, we should be
3848 # reporting diskbytes==sharebytes
3849 self.failUnlessEqual(rec["configured-sharebytes"],
3850 rec["configured-diskbytes"])
3851 d.addCallback(_check)
3854 def test_share_corruption(self):
3855 self._poll_should_ignore_these_errors = [
3856 UnknownMutableContainerVersionError,
3857 UnknownImmutableContainerVersionError,
3859 basedir = "storage/LeaseCrawler/share_corruption"
3860 fileutil.make_dirs(basedir)
3861 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3862 w = StorageStatus(ss)
3863 # make it start sooner than usual.
3864 lc = ss.lease_checker
3865 lc.stop_after_first_bucket = True
3869 # create a few shares, with some leases on them
3870 self.make_shares(ss)
3872 # now corrupt one, and make sure the lease-checker keeps going
3873 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3874 first = min(self.sis)
3875 first_b32 = base32.b2a(first)
3876 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3879 f.write("BAD MAGIC")
3881 # if get_share_file() doesn't see the correct mutable magic, it
3882 # assumes the file is an immutable share, and then
3883 # immutable.ShareFile sees a bad version. So regardless of which kind
3884 # of share we corrupted, this will trigger an
3885 # UnknownImmutableContainerVersionError.
3887 # also create an empty bucket
3888 empty_si = base32.b2a("\x04"*16)
3889 empty_bucket_dir = os.path.join(ss.sharedir,
3890 storage_index_to_dir(empty_si))
3891 fileutil.make_dirs(empty_bucket_dir)
3893 ss.setServiceParent(self.s)
3895 d = fireEventually()
3897 # now examine the state right after the first bucket has been
3899 def _after_first_bucket(ignored):
3901 if "cycle-to-date" not in s:
3902 d2 = fireEventually()
3903 d2.addCallback(_after_first_bucket)
3905 so_far = s["cycle-to-date"]
3906 rec = so_far["space-recovered"]
3907 self.failUnlessEqual(rec["examined-buckets"], 1)
3908 self.failUnlessEqual(rec["examined-shares"], 0)
3909 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3910 d.addCallback(_after_first_bucket)
3912 d.addCallback(lambda ign: self.render_json(w))
3913 def _check_json(json):
3914 data = simplejson.loads(json)
3915 # grr. json turns all dict keys into strings.
3916 so_far = data["lease-checker"]["cycle-to-date"]
3917 corrupt_shares = so_far["corrupt-shares"]
3918 # it also turns all tuples into lists
3919 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3920 d.addCallback(_check_json)
3921 d.addCallback(lambda ign: self.render1(w))
3922 def _check_html(html):
3923 s = remove_tags(html)
3924 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3925 d.addCallback(_check_html)
3928 return bool(lc.get_state()["last-cycle-finished"] is not None)
3929 d.addCallback(lambda ign: self.poll(_wait))
3931 def _after_first_cycle(ignored):
3933 last = s["history"][0]
3934 rec = last["space-recovered"]
3935 self.failUnlessEqual(rec["examined-buckets"], 5)
3936 self.failUnlessEqual(rec["examined-shares"], 3)
3937 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3938 d.addCallback(_after_first_cycle)
3939 d.addCallback(lambda ign: self.render_json(w))
3940 def _check_json_history(json):
3941 data = simplejson.loads(json)
3942 last = data["lease-checker"]["history"]["0"]
3943 corrupt_shares = last["corrupt-shares"]
3944 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3945 d.addCallback(_check_json_history)
3946 d.addCallback(lambda ign: self.render1(w))
3947 def _check_html_history(html):
3948 s = remove_tags(html)
3949 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3950 d.addCallback(_check_html_history)
3953 self.flushLoggedErrors(UnknownMutableContainerVersionError,
3954 UnknownImmutableContainerVersionError)
3959 def render_json(self, page):
3960 d = self.render1(page, args={"t": ["json"]})
3963 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3966 self.s = service.MultiService()
3967 self.s.startService()
3969 return self.s.stopService()
3971 def test_no_server(self):
3972 w = StorageStatus(None)
3973 html = w.renderSynchronously()
3974 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3976 def test_status(self):
3977 basedir = "storage/WebStatus/status"
3978 fileutil.make_dirs(basedir)
3979 ss = StorageServer(basedir, "\x00" * 20)
3980 ss.setServiceParent(self.s)
3981 w = StorageStatus(ss)
3983 def _check_html(html):
3984 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3985 s = remove_tags(html)
3986 self.failUnlessIn("Accepting new shares: Yes", s)
3987 self.failUnlessIn("Reserved space: - 0 B (0)", s)
3988 d.addCallback(_check_html)
3989 d.addCallback(lambda ign: self.render_json(w))
3990 def _check_json(json):
3991 data = simplejson.loads(json)
3993 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3994 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3995 self.failUnlessIn("bucket-counter", data)
3996 self.failUnlessIn("lease-checker", data)
3997 d.addCallback(_check_json)
4000 def render_json(self, page):
4001 d = self.render1(page, args={"t": ["json"]})
4004 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4005 def test_status_no_disk_stats(self, mock_get_disk_stats):
4006 mock_get_disk_stats.side_effect = AttributeError()
4008 # Some platforms may have no disk stats API. Make sure the code can handle that
4009 # (test runs on all platforms).
4010 basedir = "storage/WebStatus/status_no_disk_stats"
4011 fileutil.make_dirs(basedir)
4012 ss = StorageServer(basedir, "\x00" * 20)
4013 ss.setServiceParent(self.s)
4014 w = StorageStatus(ss)
4015 html = w.renderSynchronously()
4016 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4017 s = remove_tags(html)
4018 self.failUnlessIn("Accepting new shares: Yes", s)
4019 self.failUnlessIn("Total disk space: ?", s)
4020 self.failUnlessIn("Space Available to Tahoe: ?", s)
4021 self.failUnless(ss.get_available_space() is None)
4023 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4024 def test_status_bad_disk_stats(self, mock_get_disk_stats):
4025 mock_get_disk_stats.side_effect = OSError()
4027 # If the API to get disk stats exists but a call to it fails, then the status should
4028 # show that no shares will be accepted, and get_available_space() should be 0.
4029 basedir = "storage/WebStatus/status_bad_disk_stats"
4030 fileutil.make_dirs(basedir)
4031 ss = StorageServer(basedir, "\x00" * 20)
4032 ss.setServiceParent(self.s)
4033 w = StorageStatus(ss)
4034 html = w.renderSynchronously()
4035 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4036 s = remove_tags(html)
4037 self.failUnlessIn("Accepting new shares: No", s)
4038 self.failUnlessIn("Total disk space: ?", s)
4039 self.failUnlessIn("Space Available to Tahoe: ?", s)
4040 self.failUnlessEqual(ss.get_available_space(), 0)
4042 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4043 def test_status_right_disk_stats(self, mock_get_disk_stats):
4046 free_for_root = 4*GB
4047 free_for_nonroot = 3*GB
4048 reserved_space = 1*GB
4049 used = total - free_for_root
4050 avail = max(free_for_nonroot - reserved_space, 0)
4051 mock_get_disk_stats.return_value = {
4053 'free_for_root': free_for_root,
4054 'free_for_nonroot': free_for_nonroot,
4059 basedir = "storage/WebStatus/status_right_disk_stats"
4060 fileutil.make_dirs(basedir)
4061 ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4062 expecteddir = ss.sharedir
4063 ss.setServiceParent(self.s)
4064 w = StorageStatus(ss)
4065 html = w.renderSynchronously()
4067 self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4068 mock_get_disk_stats.call_args_list)
4070 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4071 s = remove_tags(html)
4072 self.failUnlessIn("Total disk space: 5.00 GB", s)
4073 self.failUnlessIn("Disk space used: - 1.00 GB", s)
4074 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4075 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4076 self.failUnlessIn("Reserved space: - 1.00 GB", s)
4077 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4078 self.failUnlessEqual(ss.get_available_space(), 2*GB)
4080 def test_readonly(self):
4081 basedir = "storage/WebStatus/readonly"
4082 fileutil.make_dirs(basedir)
4083 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4084 ss.setServiceParent(self.s)
4085 w = StorageStatus(ss)
4086 html = w.renderSynchronously()
4087 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4088 s = remove_tags(html)
4089 self.failUnlessIn("Accepting new shares: No", s)
4091 def test_reserved(self):
4092 basedir = "storage/WebStatus/reserved"
4093 fileutil.make_dirs(basedir)
4094 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4095 ss.setServiceParent(self.s)
4096 w = StorageStatus(ss)
4097 html = w.renderSynchronously()
4098 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4099 s = remove_tags(html)
4100 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4102 def test_huge_reserved(self):
4103 basedir = "storage/WebStatus/reserved"
4104 fileutil.make_dirs(basedir)
4105 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4106 ss.setServiceParent(self.s)
4107 w = StorageStatus(ss)
4108 html = w.renderSynchronously()
4109 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4110 s = remove_tags(html)
4111 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4113 def test_util(self):
4114 w = StorageStatus(None)
4115 self.failUnlessEqual(w.render_space(None, None), "?")
4116 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4117 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4118 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4119 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4120 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)