1 import time, os.path, platform, stat, re, simplejson, struct, shutil
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24 LayoutInvalid, MDMFSIGNABLEHEADER, \
25 SIGNED_PREFIX, MDMFHEADER, \
26 MDMFOFFSETS, SDMFSlotWriteProxy, \
29 VERIFICATION_KEY_SIZE, \
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
40 def __init__(self, ignore_disconnectors=False):
41 self.ignore = ignore_disconnectors
42 self.disconnectors = {}
43 def notifyOnDisconnect(self, f, *args, **kwargs):
47 self.disconnectors[m] = (f, args, kwargs)
49 def dontNotifyOnDisconnect(self, marker):
52 del self.disconnectors[marker]
54 class FakeStatsProvider:
55 def count(self, name, delta=1):
57 def register_producer(self, producer):
60 class Bucket(unittest.TestCase):
61 def make_workdir(self, name):
62 basedir = os.path.join("storage", "Bucket", name)
63 incoming = os.path.join(basedir, "tmp", "bucket")
64 final = os.path.join(basedir, "bucket")
65 fileutil.make_dirs(basedir)
66 fileutil.make_dirs(os.path.join(basedir, "tmp"))
67 return incoming, final
69 def bucket_writer_closed(self, bw, consumed):
71 def add_latency(self, category, latency):
73 def count(self, name, delta=1):
78 renew_secret = os.urandom(32)
79 cancel_secret = os.urandom(32)
80 expiration_time = time.time() + 5000
81 return LeaseInfo(owner_num, renew_secret, cancel_secret,
82 expiration_time, "\x00" * 20)
84 def test_create(self):
85 incoming, final = self.make_workdir("test_create")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*25)
91 bw.remote_write(75, "d"*7)
94 def test_readwrite(self):
95 incoming, final = self.make_workdir("test_readwrite")
96 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
98 bw.remote_write(0, "a"*25)
99 bw.remote_write(25, "b"*25)
100 bw.remote_write(50, "c"*7) # last block may be short
104 br = BucketReader(self, bw.finalhome)
105 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
109 def test_read_past_end_of_share_data(self):
110 # test vector for immutable files (hard-coded contents of an immutable share
113 # The following immutable share file content is identical to that
114 # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115 # with share data == 'a'. The total size of this content is 85
118 containerdata = struct.pack('>LLL', 1, 1, 1)
120 # A Tahoe-LAFS storage client would send as the share_data a
121 # complicated string involving hash trees and a URI Extension Block
122 # -- see allmydata/immutable/layout.py . This test, which is
123 # simulating a client, just sends 'a'.
126 ownernumber = struct.pack('>L', 0)
127 renewsecret = 'THIS LETS ME RENEW YOUR FILE....'
128 assert len(renewsecret) == 32
129 cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130 assert len(cancelsecret) == 32
131 expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
133 lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
135 share_file_data = containerdata + share_data + lease_data
137 incoming, final = self.make_workdir("test_read_past_end_of_share_data")
139 fileutil.write(final, share_file_data)
141 mockstorageserver = mock.Mock()
144 br = BucketReader(mockstorageserver, final)
146 self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
148 # Read past the end of share data to get the cancel secret.
149 read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
151 result_of_read = br.remote_read(0, read_length)
152 self.failUnlessEqual(result_of_read, share_data)
154 result_of_read = br.remote_read(0, len(share_data)+1)
155 self.failUnlessEqual(result_of_read, share_data)
163 def callRemote(self, methname, *args, **kwargs):
165 meth = getattr(self.target, "remote_" + methname)
166 return meth(*args, **kwargs)
168 if methname == "slot_readv":
170 if "writev" in methname:
171 self.write_count += 1
173 return defer.maybeDeferred(_call)
176 class BucketProxy(unittest.TestCase):
177 def make_bucket(self, name, size):
178 basedir = os.path.join("storage", "BucketProxy", name)
179 incoming = os.path.join(basedir, "tmp", "bucket")
180 final = os.path.join(basedir, "bucket")
181 fileutil.make_dirs(basedir)
182 fileutil.make_dirs(os.path.join(basedir, "tmp"))
183 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
189 def make_lease(self):
191 renew_secret = os.urandom(32)
192 cancel_secret = os.urandom(32)
193 expiration_time = time.time() + 5000
194 return LeaseInfo(owner_num, renew_secret, cancel_secret,
195 expiration_time, "\x00" * 20)
197 def bucket_writer_closed(self, bw, consumed):
199 def add_latency(self, category, latency):
201 def count(self, name, delta=1):
204 def test_create(self):
205 bw, rb, sharefname = self.make_bucket("test_create", 500)
206 bp = WriteBucketProxy(rb, None,
211 uri_extension_size_max=500)
212 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
214 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215 # Let's pretend each share has 100 bytes of data, and that there are
216 # 4 segments (25 bytes each), and 8 shares total. So the two
217 # per-segment merkle trees (crypttext_hash_tree,
218 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221 # long. That should make the whole share:
223 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
226 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
228 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
230 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
232 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
234 uri_extension = "s" + "E"*498 + "e"
236 bw, rb, sharefname = self.make_bucket(name, sharesize)
237 bp = wbp_class(rb, None,
242 uri_extension_size_max=len(uri_extension))
245 d.addCallback(lambda res: bp.put_block(0, "a"*25))
246 d.addCallback(lambda res: bp.put_block(1, "b"*25))
247 d.addCallback(lambda res: bp.put_block(2, "c"*25))
248 d.addCallback(lambda res: bp.put_block(3, "d"*20))
249 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253 d.addCallback(lambda res: bp.close())
255 # now read everything back
256 def _start_reading(res):
257 br = BucketReader(self, sharefname)
260 server = NoNetworkServer("abc", None)
261 rbp = rbp_class(rb, server, storage_index="")
262 self.failUnlessIn("to peer", repr(rbp))
263 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
265 d1 = rbp.get_block_data(0, 25, 25)
266 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
274 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275 d1.addCallback(lambda res:
276 self.failUnlessEqual(res, crypttext_hashes))
277 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279 d1.addCallback(lambda res: rbp.get_share_hashes())
280 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281 d1.addCallback(lambda res: rbp.get_uri_extension())
282 d1.addCallback(lambda res:
283 self.failUnlessEqual(res, uri_extension))
287 d.addCallback(_start_reading)
291 def test_readwrite_v1(self):
292 return self._do_test_readwrite("test_readwrite_v1",
293 0x24, WriteBucketProxy, ReadBucketProxy)
295 def test_readwrite_v2(self):
296 return self._do_test_readwrite("test_readwrite_v2",
297 0x44, WriteBucketProxy_v2, ReadBucketProxy)
299 class Server(unittest.TestCase):
302 self.sparent = LoggingServiceParent()
303 self.sparent.startService()
304 self._lease_secret = itertools.count()
306 return self.sparent.stopService()
308 def workdir(self, name):
309 basedir = os.path.join("storage", "Server", name)
312 def create(self, name, reserved_space=0, klass=StorageServer):
313 workdir = self.workdir(name)
314 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315 stats_provider=FakeStatsProvider())
316 ss.setServiceParent(self.sparent)
319 def test_create(self):
320 self.create("test_create")
322 def test_declares_fixed_1528(self):
323 ss = self.create("test_declares_fixed_1528")
324 ver = ss.remote_get_version()
325 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326 self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
328 def allocate(self, ss, storage_index, sharenums, size, canary=None):
329 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
330 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
332 canary = FakeCanary()
333 return ss.remote_allocate_buckets(storage_index,
334 renew_secret, cancel_secret,
335 sharenums, size, canary)
337 def test_large_share(self):
338 syslow = platform.system().lower()
339 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
340 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
342 avail = fileutil.get_available_space('.', 512*2**20)
344 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
346 ss = self.create("test_large_share")
348 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
349 self.failUnlessEqual(already, set())
350 self.failUnlessEqual(set(writers.keys()), set([0]))
352 shnum, bucket = writers.items()[0]
353 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
354 bucket.remote_write(2**32, "ab")
355 bucket.remote_close()
357 readers = ss.remote_get_buckets("allocate")
358 reader = readers[shnum]
359 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
361 def test_dont_overfill_dirs(self):
363 This test asserts that if you add a second share whose storage index
364 share lots of leading bits with an extant share (but isn't the exact
365 same storage index), this won't add an entry to the share directory.
367 ss = self.create("test_dont_overfill_dirs")
368 already, writers = self.allocate(ss, "storageindex", [0], 10)
369 for i, wb in writers.items():
370 wb.remote_write(0, "%10d" % i)
372 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
374 children_of_storedir = set(os.listdir(storedir))
376 # Now store another one under another storageindex that has leading
377 # chars the same as the first storageindex.
378 already, writers = self.allocate(ss, "storageindey", [0], 10)
379 for i, wb in writers.items():
380 wb.remote_write(0, "%10d" % i)
382 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
384 new_children_of_storedir = set(os.listdir(storedir))
385 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
387 def test_remove_incoming(self):
388 ss = self.create("test_remove_incoming")
389 already, writers = self.allocate(ss, "vid", range(3), 10)
390 for i,wb in writers.items():
391 wb.remote_write(0, "%10d" % i)
393 incoming_share_dir = wb.incominghome
394 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
395 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
396 incoming_dir = os.path.dirname(incoming_prefix_dir)
397 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
398 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
399 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
401 def test_abort(self):
402 # remote_abort, when called on a writer, should make sure that
403 # the allocated size of the bucket is not counted by the storage
404 # server when accounting for space.
405 ss = self.create("test_abort")
406 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
407 self.failIfEqual(ss.allocated_size(), 0)
409 # Now abort the writers.
410 for writer in writers.itervalues():
411 writer.remote_abort()
412 self.failUnlessEqual(ss.allocated_size(), 0)
415 def test_allocate(self):
416 ss = self.create("test_allocate")
418 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
420 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
421 self.failUnlessEqual(already, set())
422 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
424 # while the buckets are open, they should not count as readable
425 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
428 for i,wb in writers.items():
429 wb.remote_write(0, "%25d" % i)
431 # aborting a bucket that was already closed is a no-op
434 # now they should be readable
435 b = ss.remote_get_buckets("allocate")
436 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
437 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
439 self.failUnlessIn("BucketReader", b_str)
440 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
442 # now if we ask about writing again, the server should offer those
443 # three buckets as already present. It should offer them even if we
444 # don't ask about those specific ones.
445 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
446 self.failUnlessEqual(already, set([0,1,2]))
447 self.failUnlessEqual(set(writers.keys()), set([3,4]))
449 # while those two buckets are open for writing, the server should
450 # refuse to offer them to uploaders
452 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
453 self.failUnlessEqual(already2, set([0,1,2]))
454 self.failUnlessEqual(set(writers2.keys()), set([5]))
456 # aborting the writes should remove the tempfiles
457 for i,wb in writers2.items():
459 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
460 self.failUnlessEqual(already2, set([0,1,2]))
461 self.failUnlessEqual(set(writers2.keys()), set([5]))
463 for i,wb in writers2.items():
465 for i,wb in writers.items():
468 def test_bad_container_version(self):
469 ss = self.create("test_bad_container_version")
470 a,w = self.allocate(ss, "si1", [0], 10)
471 w[0].remote_write(0, "\xff"*10)
474 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
477 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
480 ss.remote_get_buckets("allocate")
482 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
483 ss.remote_get_buckets, "si1")
484 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
486 def test_disconnect(self):
487 # simulate a disconnection
488 ss = self.create("test_disconnect")
489 canary = FakeCanary()
490 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
491 self.failUnlessEqual(already, set())
492 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
493 for (f,args,kwargs) in canary.disconnectors.values():
498 # that ought to delete the incoming shares
499 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
500 self.failUnlessEqual(already, set())
501 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
503 @mock.patch('allmydata.util.fileutil.get_disk_stats')
504 def test_reserved_space(self, mock_get_disk_stats):
506 mock_get_disk_stats.return_value = {
507 'free_for_nonroot': 15000,
508 'avail': max(15000 - reserved_space, 0),
511 ss = self.create("test_reserved_space", reserved_space=reserved_space)
512 # 15k available, 10k reserved, leaves 5k for shares
514 # a newly created and filled share incurs this much overhead, beyond
515 # the size we request.
517 LEASE_SIZE = 4+32+32+4
518 canary = FakeCanary(True)
519 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
520 self.failUnlessEqual(len(writers), 3)
521 # now the StorageServer should have 3000 bytes provisionally
522 # allocated, allowing only 2000 more to be claimed
523 self.failUnlessEqual(len(ss._active_writers), 3)
525 # allocating 1001-byte shares only leaves room for one
526 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
527 self.failUnlessEqual(len(writers2), 1)
528 self.failUnlessEqual(len(ss._active_writers), 4)
530 # we abandon the first set, so their provisional allocation should be
534 self.failUnlessEqual(len(ss._active_writers), 1)
535 # now we have a provisional allocation of 1001 bytes
537 # and we close the second set, so their provisional allocation should
538 # become real, long-term allocation, and grows to include the
540 for bw in writers2.values():
541 bw.remote_write(0, "a"*25)
546 self.failUnlessEqual(len(ss._active_writers), 0)
548 allocated = 1001 + OVERHEAD + LEASE_SIZE
550 # we have to manually increase available, since we're not doing real
552 mock_get_disk_stats.return_value = {
553 'free_for_nonroot': 15000 - allocated,
554 'avail': max(15000 - allocated - reserved_space, 0),
557 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
558 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
559 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
560 self.failUnlessEqual(len(writers3), 39)
561 self.failUnlessEqual(len(ss._active_writers), 39)
565 self.failUnlessEqual(len(ss._active_writers), 0)
566 ss.disownServiceParent()
570 basedir = self.workdir("test_seek_behavior")
571 fileutil.make_dirs(basedir)
572 filename = os.path.join(basedir, "testfile")
573 f = open(filename, "wb")
576 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
577 # files. mode="a" preserves previous contents but does not allow
578 # seeking-to-create-holes. mode="r+" allows both.
579 f = open(filename, "rb+")
583 filelen = os.stat(filename)[stat.ST_SIZE]
584 self.failUnlessEqual(filelen, 100+3)
585 f2 = open(filename, "rb")
586 self.failUnlessEqual(f2.read(5), "start")
589 def test_leases(self):
590 ss = self.create("test_leases")
591 canary = FakeCanary()
595 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
596 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
597 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
598 sharenums, size, canary)
599 self.failUnlessEqual(len(already), 0)
600 self.failUnlessEqual(len(writers), 5)
601 for wb in writers.values():
604 leases = list(ss.get_leases("si0"))
605 self.failUnlessEqual(len(leases), 1)
606 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
608 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
611 sharenums, size, canary)
612 for wb in writers.values():
615 # take out a second lease on si1
616 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
617 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
618 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
619 sharenums, size, canary)
620 self.failUnlessEqual(len(already), 5)
621 self.failUnlessEqual(len(writers), 0)
623 leases = list(ss.get_leases("si1"))
624 self.failUnlessEqual(len(leases), 2)
625 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
627 # and a third lease, using add-lease
628 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
629 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
630 ss.remote_add_lease("si1", rs2a, cs2a)
631 leases = list(ss.get_leases("si1"))
632 self.failUnlessEqual(len(leases), 3)
633 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
635 # add-lease on a missing storage index is silently ignored
636 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
638 # check that si0 is readable
639 readers = ss.remote_get_buckets("si0")
640 self.failUnlessEqual(len(readers), 5)
642 # renew the first lease. Only the proper renew_secret should work
643 ss.remote_renew_lease("si0", rs0)
644 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
645 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
647 # check that si0 is still readable
648 readers = ss.remote_get_buckets("si0")
649 self.failUnlessEqual(len(readers), 5)
651 # There is no such method as remote_cancel_lease for now -- see
653 self.failIf(hasattr(ss, 'remote_cancel_lease'), \
654 "ss should not have a 'remote_cancel_lease' method/attribute")
656 # test overlapping uploads
657 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
658 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
659 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
660 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
661 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
662 sharenums, size, canary)
663 self.failUnlessEqual(len(already), 0)
664 self.failUnlessEqual(len(writers), 5)
665 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
666 sharenums, size, canary)
667 self.failUnlessEqual(len(already2), 0)
668 self.failUnlessEqual(len(writers2), 0)
669 for wb in writers.values():
672 leases = list(ss.get_leases("si3"))
673 self.failUnlessEqual(len(leases), 1)
675 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
676 sharenums, size, canary)
677 self.failUnlessEqual(len(already3), 5)
678 self.failUnlessEqual(len(writers3), 0)
680 leases = list(ss.get_leases("si3"))
681 self.failUnlessEqual(len(leases), 2)
683 def test_readonly(self):
684 workdir = self.workdir("test_readonly")
685 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
686 ss.setServiceParent(self.sparent)
688 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
689 self.failUnlessEqual(already, set())
690 self.failUnlessEqual(writers, {})
692 stats = ss.get_stats()
693 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
694 if "storage_server.disk_avail" in stats:
695 # Some platforms may not have an API to get disk stats.
696 # But if there are stats, readonly_storage means disk_avail=0
697 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
699 def test_discard(self):
700 # discard is really only used for other tests, but we test it anyways
701 workdir = self.workdir("test_discard")
702 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
703 ss.setServiceParent(self.sparent)
705 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
706 self.failUnlessEqual(already, set())
707 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
708 for i,wb in writers.items():
709 wb.remote_write(0, "%25d" % i)
711 # since we discard the data, the shares should be present but sparse.
712 # Since we write with some seeks, the data we read back will be all
714 b = ss.remote_get_buckets("vid")
715 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
716 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
718 def test_advise_corruption(self):
719 workdir = self.workdir("test_advise_corruption")
720 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
721 ss.setServiceParent(self.sparent)
723 si0_s = base32.b2a("si0")
724 ss.remote_advise_corrupt_share("immutable", "si0", 0,
725 "This share smells funny.\n")
726 reportdir = os.path.join(workdir, "corruption-advisories")
727 reports = os.listdir(reportdir)
728 self.failUnlessEqual(len(reports), 1)
729 report_si0 = reports[0]
730 self.failUnlessIn(si0_s, report_si0)
731 f = open(os.path.join(reportdir, report_si0), "r")
734 self.failUnlessIn("type: immutable", report)
735 self.failUnlessIn("storage_index: %s" % si0_s, report)
736 self.failUnlessIn("share_number: 0", report)
737 self.failUnlessIn("This share smells funny.", report)
739 # test the RIBucketWriter version too
740 si1_s = base32.b2a("si1")
741 already,writers = self.allocate(ss, "si1", [1], 75)
742 self.failUnlessEqual(already, set())
743 self.failUnlessEqual(set(writers.keys()), set([1]))
744 writers[1].remote_write(0, "data")
745 writers[1].remote_close()
747 b = ss.remote_get_buckets("si1")
748 self.failUnlessEqual(set(b.keys()), set([1]))
749 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
751 reports = os.listdir(reportdir)
752 self.failUnlessEqual(len(reports), 2)
753 report_si1 = [r for r in reports if si1_s in r][0]
754 f = open(os.path.join(reportdir, report_si1), "r")
757 self.failUnlessIn("type: immutable", report)
758 self.failUnlessIn("storage_index: %s" % si1_s, report)
759 self.failUnlessIn("share_number: 1", report)
760 self.failUnlessIn("This share tastes like dust.", report)
764 class MutableServer(unittest.TestCase):
767 self.sparent = LoggingServiceParent()
768 self._lease_secret = itertools.count()
770 return self.sparent.stopService()
772 def workdir(self, name):
773 basedir = os.path.join("storage", "MutableServer", name)
776 def create(self, name):
777 workdir = self.workdir(name)
778 ss = StorageServer(workdir, "\x00" * 20)
779 ss.setServiceParent(self.sparent)
782 def test_create(self):
783 self.create("test_create")
785 def write_enabler(self, we_tag):
786 return hashutil.tagged_hash("we_blah", we_tag)
788 def renew_secret(self, tag):
789 return hashutil.tagged_hash("renew_blah", str(tag))
791 def cancel_secret(self, tag):
792 return hashutil.tagged_hash("cancel_blah", str(tag))
794 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
795 write_enabler = self.write_enabler(we_tag)
796 renew_secret = self.renew_secret(lease_tag)
797 cancel_secret = self.cancel_secret(lease_tag)
798 rstaraw = ss.remote_slot_testv_and_readv_and_writev
799 testandwritev = dict( [ (shnum, ([], [], None) )
800 for shnum in sharenums ] )
802 rc = rstaraw(storage_index,
803 (write_enabler, renew_secret, cancel_secret),
806 (did_write, readv_data) = rc
807 self.failUnless(did_write)
808 self.failUnless(isinstance(readv_data, dict))
809 self.failUnlessEqual(len(readv_data), 0)
811 def test_bad_magic(self):
812 ss = self.create("test_bad_magic")
813 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
814 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
819 read = ss.remote_slot_readv
820 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
821 read, "si1", [0], [(0,10)])
822 self.failUnlessIn(" had magic ", str(e))
823 self.failUnlessIn(" but we wanted ", str(e))
825 def test_container_size(self):
826 ss = self.create("test_container_size")
827 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
829 read = ss.remote_slot_readv
830 rstaraw = ss.remote_slot_testv_and_readv_and_writev
831 secrets = ( self.write_enabler("we1"),
832 self.renew_secret("we1"),
833 self.cancel_secret("we1") )
834 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
835 answer = rstaraw("si1", secrets,
836 {0: ([], [(0,data)], len(data)+12)},
838 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
840 # trying to make the container too large will raise an exception
841 TOOBIG = MutableShareFile.MAX_SIZE + 10
842 self.failUnlessRaises(DataTooLargeError,
843 rstaraw, "si1", secrets,
844 {0: ([], [(0,data)], TOOBIG)},
847 # it should be possible to make the container smaller, although at
848 # the moment this doesn't actually affect the share, unless the
849 # container size is dropped to zero, in which case the share is
851 answer = rstaraw("si1", secrets,
852 {0: ([], [(0,data)], len(data)+8)},
854 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
856 answer = rstaraw("si1", secrets,
857 {0: ([], [(0,data)], 0)},
859 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
861 read_answer = read("si1", [0], [(0,10)])
862 self.failUnlessEqual(read_answer, {})
864 def test_allocate(self):
865 ss = self.create("test_allocate")
866 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
869 read = ss.remote_slot_readv
870 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
872 self.failUnlessEqual(read("si1", [], [(0, 10)]),
873 {0: [""], 1: [""], 2: [""]})
874 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
878 secrets = ( self.write_enabler("we1"),
879 self.renew_secret("we1"),
880 self.cancel_secret("we1") )
881 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
882 write = ss.remote_slot_testv_and_readv_and_writev
883 answer = write("si1", secrets,
884 {0: ([], [(0,data)], None)},
886 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
888 self.failUnlessEqual(read("si1", [0], [(0,20)]),
889 {0: ["00000000001111111111"]})
890 self.failUnlessEqual(read("si1", [0], [(95,10)]),
892 #self.failUnlessEqual(s0.remote_get_length(), 100)
894 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
895 f = self.failUnlessRaises(BadWriteEnablerError,
896 write, "si1", bad_secrets,
898 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
900 # this testv should fail
901 answer = write("si1", secrets,
902 {0: ([(0, 12, "eq", "444444444444"),
903 (20, 5, "eq", "22222"),
910 self.failUnlessEqual(answer, (False,
911 {0: ["000000000011", "22222"],
915 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
918 answer = write("si1", secrets,
919 {0: ([(10, 5, "lt", "11111"),
926 self.failUnlessEqual(answer, (False,
931 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
934 def test_operators(self):
935 # test operators, the data we're comparing is '11111' in all cases.
936 # test both fail+pass, reset data after each one.
937 ss = self.create("test_operators")
939 secrets = ( self.write_enabler("we1"),
940 self.renew_secret("we1"),
941 self.cancel_secret("we1") )
942 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
943 write = ss.remote_slot_testv_and_readv_and_writev
944 read = ss.remote_slot_readv
947 write("si1", secrets,
948 {0: ([], [(0,data)], None)},
954 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
959 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
960 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
961 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
964 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
969 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
970 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
973 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
978 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
979 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
983 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
988 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
989 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
992 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
997 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
998 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1001 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1006 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1007 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1011 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1016 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1017 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1020 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1025 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1026 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1030 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1035 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1036 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1039 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1044 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1045 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1049 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1054 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1055 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1058 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1063 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1064 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1067 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1072 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1073 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1077 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1082 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1083 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1086 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1091 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1092 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1095 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1100 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1101 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1104 # finally, test some operators against empty shares
1105 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1110 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1111 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1114 def test_readv(self):
1115 ss = self.create("test_readv")
1116 secrets = ( self.write_enabler("we1"),
1117 self.renew_secret("we1"),
1118 self.cancel_secret("we1") )
1119 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1120 write = ss.remote_slot_testv_and_readv_and_writev
1121 read = ss.remote_slot_readv
1122 data = [("%d" % i) * 100 for i in range(3)]
1123 rc = write("si1", secrets,
1124 {0: ([], [(0,data[0])], None),
1125 1: ([], [(0,data[1])], None),
1126 2: ([], [(0,data[2])], None),
1128 self.failUnlessEqual(rc, (True, {}))
1130 answer = read("si1", [], [(0, 10)])
1131 self.failUnlessEqual(answer, {0: ["0"*10],
1135 def compare_leases_without_timestamps(self, leases_a, leases_b):
1136 self.failUnlessEqual(len(leases_a), len(leases_b))
1137 for i in range(len(leases_a)):
1140 self.failUnlessEqual(a.owner_num, b.owner_num)
1141 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1142 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1143 self.failUnlessEqual(a.nodeid, b.nodeid)
1145 def compare_leases(self, leases_a, leases_b):
1146 self.failUnlessEqual(len(leases_a), len(leases_b))
1147 for i in range(len(leases_a)):
1150 self.failUnlessEqual(a.owner_num, b.owner_num)
1151 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1152 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1153 self.failUnlessEqual(a.nodeid, b.nodeid)
1154 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1156 def test_leases(self):
1157 ss = self.create("test_leases")
1159 return ( self.write_enabler("we1"),
1160 self.renew_secret("we1-%d" % n),
1161 self.cancel_secret("we1-%d" % n) )
1162 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1163 write = ss.remote_slot_testv_and_readv_and_writev
1164 read = ss.remote_slot_readv
1165 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1166 self.failUnlessEqual(rc, (True, {}))
1168 # create a random non-numeric file in the bucket directory, to
1169 # exercise the code that's supposed to ignore those.
1170 bucket_dir = os.path.join(self.workdir("test_leases"),
1171 "shares", storage_index_to_dir("si1"))
1172 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1173 f.write("you ought to be ignoring me\n")
1176 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1177 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1179 # add-lease on a missing storage index is silently ignored
1180 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1182 # re-allocate the slots and use the same secrets, that should update
1184 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1185 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1188 ss.remote_renew_lease("si1", secrets(0)[1])
1189 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1191 # now allocate them with a bunch of different secrets, to trigger the
1192 # extended lease code. Use add_lease for one of them.
1193 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1194 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1195 secrets2 = secrets(2)
1196 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1197 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1198 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1199 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1200 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1202 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1204 all_leases = list(s0.get_leases())
1205 # and write enough data to expand the container, forcing the server
1206 # to move the leases
1207 write("si1", secrets(0),
1208 {0: ([], [(0,data)], 200), },
1211 # read back the leases, make sure they're still intact.
1212 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1214 ss.remote_renew_lease("si1", secrets(0)[1])
1215 ss.remote_renew_lease("si1", secrets(1)[1])
1216 ss.remote_renew_lease("si1", secrets(2)[1])
1217 ss.remote_renew_lease("si1", secrets(3)[1])
1218 ss.remote_renew_lease("si1", secrets(4)[1])
1219 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1220 # get a new copy of the leases, with the current timestamps. Reading
1221 # data and failing to renew/cancel leases should leave the timestamps
1223 all_leases = list(s0.get_leases())
1224 # renewing with a bogus token should prompt an error message
1226 # examine the exception thus raised, make sure the old nodeid is
1227 # present, to provide for share migration
1228 e = self.failUnlessRaises(IndexError,
1229 ss.remote_renew_lease, "si1",
1232 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1233 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1234 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1236 self.compare_leases(all_leases, list(s0.get_leases()))
1238 # reading shares should not modify the timestamp
1239 read("si1", [], [(0,200)])
1240 self.compare_leases(all_leases, list(s0.get_leases()))
1242 write("si1", secrets(0),
1243 {0: ([], [(200, "make me bigger")], None)}, [])
1244 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1246 write("si1", secrets(0),
1247 {0: ([], [(500, "make me really bigger")], None)}, [])
1248 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1250 def test_remove(self):
1251 ss = self.create("test_remove")
1252 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1254 readv = ss.remote_slot_readv
1255 writev = ss.remote_slot_testv_and_readv_and_writev
1256 secrets = ( self.write_enabler("we1"),
1257 self.renew_secret("we1"),
1258 self.cancel_secret("we1") )
1259 # delete sh0 by setting its size to zero
1260 answer = writev("si1", secrets,
1263 # the answer should mention all the shares that existed before the
1265 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1266 # but a new read should show only sh1 and sh2
1267 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1270 # delete sh1 by setting its size to zero
1271 answer = writev("si1", secrets,
1274 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1275 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1278 # delete sh2 by setting its size to zero
1279 answer = writev("si1", secrets,
1282 self.failUnlessEqual(answer, (True, {2:[]}) )
1283 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1285 # and the bucket directory should now be gone
1286 si = base32.b2a("si1")
1287 # note: this is a detail of the storage server implementation, and
1288 # may change in the future
1290 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1291 bucketdir = os.path.join(prefixdir, si)
1292 self.failUnless(os.path.exists(prefixdir), prefixdir)
1293 self.failIf(os.path.exists(bucketdir), bucketdir)
1296 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1298 self.sparent = LoggingServiceParent()
1299 self._lease_secret = itertools.count()
1300 self.ss = self.create("MDMFProxies storage test server")
1301 self.rref = RemoteBucket()
1302 self.rref.target = self.ss
1303 self.secrets = (self.write_enabler("we_secret"),
1304 self.renew_secret("renew_secret"),
1305 self.cancel_secret("cancel_secret"))
1306 self.segment = "aaaaaa"
1308 self.salt = "a" * 16
1309 self.block_hash = "a" * 32
1310 self.block_hash_tree = [self.block_hash for i in xrange(6)]
1311 self.share_hash = self.block_hash
1312 self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1313 self.signature = "foobarbaz"
1314 self.verification_key = "vvvvvv"
1315 self.encprivkey = "private"
1316 self.root_hash = self.block_hash
1317 self.salt_hash = self.root_hash
1318 self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1319 self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1320 self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1321 # blockhashes and salt hashes are serialized in the same way,
1322 # only we lop off the first element and store that in the
1324 self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1328 self.sparent.stopService()
1329 shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1332 def write_enabler(self, we_tag):
1333 return hashutil.tagged_hash("we_blah", we_tag)
1336 def renew_secret(self, tag):
1337 return hashutil.tagged_hash("renew_blah", str(tag))
1340 def cancel_secret(self, tag):
1341 return hashutil.tagged_hash("cancel_blah", str(tag))
1344 def workdir(self, name):
1345 basedir = os.path.join("storage", "MutableServer", name)
1349 def create(self, name):
1350 workdir = self.workdir(name)
1351 ss = StorageServer(workdir, "\x00" * 20)
1352 ss.setServiceParent(self.sparent)
1356 def build_test_mdmf_share(self, tail_segment=False, empty=False):
1357 # Start with the checkstring
1358 data = struct.pack(">BQ32s",
1362 self.checkstring = data
1363 # Next, the encoding parameters
1365 data += struct.pack(">BBQQ",
1371 data += struct.pack(">BBQQ",
1377 data += struct.pack(">BBQQ",
1382 # Now we'll build the offsets.
1384 if not tail_segment and not empty:
1386 sharedata += self.salt + self.block
1389 sharedata += self.salt + self.block
1390 sharedata += self.salt + "a"
1392 # The encrypted private key comes after the shares + salts
1393 offset_size = struct.calcsize(MDMFOFFSETS)
1394 encrypted_private_key_offset = len(data) + offset_size
1395 # The share has chain comes after the private key
1396 sharehashes_offset = encrypted_private_key_offset + \
1397 len(self.encprivkey)
1399 # The signature comes after the share hash chain.
1400 signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1402 verification_key_offset = signature_offset + len(self.signature)
1403 verification_key_end = verification_key_offset + \
1404 len(self.verification_key)
1406 share_data_offset = offset_size
1407 share_data_offset += PRIVATE_KEY_SIZE
1408 share_data_offset += SIGNATURE_SIZE
1409 share_data_offset += VERIFICATION_KEY_SIZE
1410 share_data_offset += SHARE_HASH_CHAIN_SIZE
1412 blockhashes_offset = share_data_offset + len(sharedata)
1413 eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1415 data += struct.pack(MDMFOFFSETS,
1416 encrypted_private_key_offset,
1419 verification_key_offset,
1420 verification_key_end,
1426 self.offsets['enc_privkey'] = encrypted_private_key_offset
1427 self.offsets['block_hash_tree'] = blockhashes_offset
1428 self.offsets['share_hash_chain'] = sharehashes_offset
1429 self.offsets['signature'] = signature_offset
1430 self.offsets['verification_key'] = verification_key_offset
1431 self.offsets['share_data'] = share_data_offset
1432 self.offsets['verification_key_end'] = verification_key_end
1433 self.offsets['EOF'] = eof_offset
1436 data += self.encprivkey
1438 data += self.share_hash_chain_s
1440 data += self.signature
1441 # and the verification key
1442 data += self.verification_key
1443 # Then we'll add in gibberish until we get to the right point.
1444 nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1447 # Then the share data
1450 data += self.block_hash_tree_s
1454 def write_test_share_to_server(self,
1459 I write some data for the read tests to read to self.ss
1461 If tail_segment=True, then I will write a share that has a
1462 smaller tail segment than other segments.
1464 write = self.ss.remote_slot_testv_and_readv_and_writev
1465 data = self.build_test_mdmf_share(tail_segment, empty)
1466 # Finally, we write the whole thing to the storage server in one
1468 testvs = [(0, 1, "eq", "")]
1470 tws[0] = (testvs, [(0, data)], None)
1472 results = write(storage_index, self.secrets, tws, readv)
1473 self.failUnless(results[0])
1476 def build_test_sdmf_share(self, empty=False):
1480 sharedata = self.segment * 6
1481 self.sharedata = sharedata
1482 blocksize = len(sharedata) / 3
1483 block = sharedata[:blocksize]
1484 self.blockdata = block
1485 prefix = struct.pack(">BQ32s16s BBQQ",
1495 post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1496 signature_offset = post_offset + len(self.verification_key)
1497 sharehashes_offset = signature_offset + len(self.signature)
1498 blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1499 sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1500 encprivkey_offset = sharedata_offset + len(block)
1501 eof_offset = encprivkey_offset + len(self.encprivkey)
1502 offsets = struct.pack(">LLLLQQ",
1509 final_share = "".join([prefix,
1511 self.verification_key,
1513 self.share_hash_chain_s,
1514 self.block_hash_tree_s,
1518 self.offsets['signature'] = signature_offset
1519 self.offsets['share_hash_chain'] = sharehashes_offset
1520 self.offsets['block_hash_tree'] = blockhashes_offset
1521 self.offsets['share_data'] = sharedata_offset
1522 self.offsets['enc_privkey'] = encprivkey_offset
1523 self.offsets['EOF'] = eof_offset
1527 def write_sdmf_share_to_server(self,
1530 # Some tests need SDMF shares to verify that we can still
1531 # read them. This method writes one, which resembles but is not
1533 write = self.ss.remote_slot_testv_and_readv_and_writev
1534 share = self.build_test_sdmf_share(empty)
1535 testvs = [(0, 1, "eq", "")]
1537 tws[0] = (testvs, [(0, share)], None)
1539 results = write(storage_index, self.secrets, tws, readv)
1540 self.failUnless(results[0])
1543 def test_read(self):
1544 self.write_test_share_to_server("si1")
1545 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1546 # Check that every method equals what we expect it to.
1547 d = defer.succeed(None)
1548 def _check_block_and_salt((block, salt)):
1549 self.failUnlessEqual(block, self.block)
1550 self.failUnlessEqual(salt, self.salt)
1553 d.addCallback(lambda ignored, i=i:
1554 mr.get_block_and_salt(i))
1555 d.addCallback(_check_block_and_salt)
1557 d.addCallback(lambda ignored:
1558 mr.get_encprivkey())
1559 d.addCallback(lambda encprivkey:
1560 self.failUnlessEqual(self.encprivkey, encprivkey))
1562 d.addCallback(lambda ignored:
1563 mr.get_blockhashes())
1564 d.addCallback(lambda blockhashes:
1565 self.failUnlessEqual(self.block_hash_tree, blockhashes))
1567 d.addCallback(lambda ignored:
1568 mr.get_sharehashes())
1569 d.addCallback(lambda sharehashes:
1570 self.failUnlessEqual(self.share_hash_chain, sharehashes))
1572 d.addCallback(lambda ignored:
1574 d.addCallback(lambda signature:
1575 self.failUnlessEqual(signature, self.signature))
1577 d.addCallback(lambda ignored:
1578 mr.get_verification_key())
1579 d.addCallback(lambda verification_key:
1580 self.failUnlessEqual(verification_key, self.verification_key))
1582 d.addCallback(lambda ignored:
1584 d.addCallback(lambda seqnum:
1585 self.failUnlessEqual(seqnum, 0))
1587 d.addCallback(lambda ignored:
1589 d.addCallback(lambda root_hash:
1590 self.failUnlessEqual(self.root_hash, root_hash))
1592 d.addCallback(lambda ignored:
1594 d.addCallback(lambda seqnum:
1595 self.failUnlessEqual(0, seqnum))
1597 d.addCallback(lambda ignored:
1598 mr.get_encoding_parameters())
1599 def _check_encoding_parameters((k, n, segsize, datalen)):
1600 self.failUnlessEqual(k, 3)
1601 self.failUnlessEqual(n, 10)
1602 self.failUnlessEqual(segsize, 6)
1603 self.failUnlessEqual(datalen, 36)
1604 d.addCallback(_check_encoding_parameters)
1606 d.addCallback(lambda ignored:
1607 mr.get_checkstring())
1608 d.addCallback(lambda checkstring:
1609 self.failUnlessEqual(checkstring, checkstring))
1613 def test_read_with_different_tail_segment_size(self):
1614 self.write_test_share_to_server("si1", tail_segment=True)
1615 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1616 d = mr.get_block_and_salt(5)
1617 def _check_tail_segment(results):
1618 block, salt = results
1619 self.failUnlessEqual(len(block), 1)
1620 self.failUnlessEqual(block, "a")
1621 d.addCallback(_check_tail_segment)
1625 def test_get_block_with_invalid_segnum(self):
1626 self.write_test_share_to_server("si1")
1627 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1628 d = defer.succeed(None)
1629 d.addCallback(lambda ignored:
1630 self.shouldFail(LayoutInvalid, "test invalid segnum",
1632 mr.get_block_and_salt, 7))
1636 def test_get_encoding_parameters_first(self):
1637 self.write_test_share_to_server("si1")
1638 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1639 d = mr.get_encoding_parameters()
1640 def _check_encoding_parameters((k, n, segment_size, datalen)):
1641 self.failUnlessEqual(k, 3)
1642 self.failUnlessEqual(n, 10)
1643 self.failUnlessEqual(segment_size, 6)
1644 self.failUnlessEqual(datalen, 36)
1645 d.addCallback(_check_encoding_parameters)
1649 def test_get_seqnum_first(self):
1650 self.write_test_share_to_server("si1")
1651 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1653 d.addCallback(lambda seqnum:
1654 self.failUnlessEqual(seqnum, 0))
1658 def test_get_root_hash_first(self):
1659 self.write_test_share_to_server("si1")
1660 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1661 d = mr.get_root_hash()
1662 d.addCallback(lambda root_hash:
1663 self.failUnlessEqual(root_hash, self.root_hash))
1667 def test_get_checkstring_first(self):
1668 self.write_test_share_to_server("si1")
1669 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1670 d = mr.get_checkstring()
1671 d.addCallback(lambda checkstring:
1672 self.failUnlessEqual(checkstring, self.checkstring))
1676 def test_write_read_vectors(self):
1677 # When writing for us, the storage server will return to us a
1678 # read vector, along with its result. If a write fails because
1679 # the test vectors failed, this read vector can help us to
1680 # diagnose the problem. This test ensures that the read vector
1681 # is working appropriately.
1682 mw = self._make_new_mw("si1", 0)
1685 mw.put_block(self.block, i, self.salt)
1686 mw.put_encprivkey(self.encprivkey)
1687 mw.put_blockhashes(self.block_hash_tree)
1688 mw.put_sharehashes(self.share_hash_chain)
1689 mw.put_root_hash(self.root_hash)
1690 mw.put_signature(self.signature)
1691 mw.put_verification_key(self.verification_key)
1692 d = mw.finish_publishing()
1694 self.failUnless(len(results), 2)
1695 result, readv = results
1696 self.failUnless(result)
1698 self.old_checkstring = mw.get_checkstring()
1699 mw.set_checkstring("")
1700 d.addCallback(_then)
1701 d.addCallback(lambda ignored:
1702 mw.finish_publishing())
1703 def _then_again(results):
1704 self.failUnlessEqual(len(results), 2)
1705 result, readvs = results
1707 self.failUnlessIn(0, readvs)
1708 readv = readvs[0][0]
1709 self.failUnlessEqual(readv, self.old_checkstring)
1710 d.addCallback(_then_again)
1711 # The checkstring remains the same for the rest of the process.
1715 def test_private_key_after_share_hash_chain(self):
1716 mw = self._make_new_mw("si1", 0)
1717 d = defer.succeed(None)
1719 d.addCallback(lambda ignored, i=i:
1720 mw.put_block(self.block, i, self.salt))
1721 d.addCallback(lambda ignored:
1722 mw.put_encprivkey(self.encprivkey))
1723 d.addCallback(lambda ignored:
1724 mw.put_sharehashes(self.share_hash_chain))
1726 # Now try to put the private key again.
1727 d.addCallback(lambda ignored:
1728 self.shouldFail(LayoutInvalid, "test repeat private key",
1730 mw.put_encprivkey, self.encprivkey))
1734 def test_signature_after_verification_key(self):
1735 mw = self._make_new_mw("si1", 0)
1736 d = defer.succeed(None)
1737 # Put everything up to and including the verification key.
1739 d.addCallback(lambda ignored, i=i:
1740 mw.put_block(self.block, i, self.salt))
1741 d.addCallback(lambda ignored:
1742 mw.put_encprivkey(self.encprivkey))
1743 d.addCallback(lambda ignored:
1744 mw.put_blockhashes(self.block_hash_tree))
1745 d.addCallback(lambda ignored:
1746 mw.put_sharehashes(self.share_hash_chain))
1747 d.addCallback(lambda ignored:
1748 mw.put_root_hash(self.root_hash))
1749 d.addCallback(lambda ignored:
1750 mw.put_signature(self.signature))
1751 d.addCallback(lambda ignored:
1752 mw.put_verification_key(self.verification_key))
1753 # Now try to put the signature again. This should fail
1754 d.addCallback(lambda ignored:
1755 self.shouldFail(LayoutInvalid, "signature after verification",
1757 mw.put_signature, self.signature))
1761 def test_uncoordinated_write(self):
1762 # Make two mutable writers, both pointing to the same storage
1763 # server, both at the same storage index, and try writing to the
1765 mw1 = self._make_new_mw("si1", 0)
1766 mw2 = self._make_new_mw("si1", 0)
1768 def _check_success(results):
1769 result, readvs = results
1770 self.failUnless(result)
1772 def _check_failure(results):
1773 result, readvs = results
1776 def _write_share(mw):
1778 mw.put_block(self.block, i, self.salt)
1779 mw.put_encprivkey(self.encprivkey)
1780 mw.put_blockhashes(self.block_hash_tree)
1781 mw.put_sharehashes(self.share_hash_chain)
1782 mw.put_root_hash(self.root_hash)
1783 mw.put_signature(self.signature)
1784 mw.put_verification_key(self.verification_key)
1785 return mw.finish_publishing()
1786 d = _write_share(mw1)
1787 d.addCallback(_check_success)
1788 d.addCallback(lambda ignored:
1790 d.addCallback(_check_failure)
1794 def test_invalid_salt_size(self):
1795 # Salts need to be 16 bytes in size. Writes that attempt to
1796 # write more or less than this should be rejected.
1797 mw = self._make_new_mw("si1", 0)
1798 invalid_salt = "a" * 17 # 17 bytes
1799 another_invalid_salt = "b" * 15 # 15 bytes
1800 d = defer.succeed(None)
1801 d.addCallback(lambda ignored:
1802 self.shouldFail(LayoutInvalid, "salt too big",
1804 mw.put_block, self.block, 0, invalid_salt))
1805 d.addCallback(lambda ignored:
1806 self.shouldFail(LayoutInvalid, "salt too small",
1808 mw.put_block, self.block, 0,
1809 another_invalid_salt))
1813 def test_write_test_vectors(self):
1814 # If we give the write proxy a bogus test vector at
1815 # any point during the process, it should fail to write when we
1817 def _check_failure(results):
1818 self.failUnlessEqual(len(results), 2)
1822 def _check_success(results):
1823 self.failUnlessEqual(len(results), 2)
1825 self.failUnless(results)
1827 mw = self._make_new_mw("si1", 0)
1828 mw.set_checkstring("this is a lie")
1830 mw.put_block(self.block, i, self.salt)
1831 mw.put_encprivkey(self.encprivkey)
1832 mw.put_blockhashes(self.block_hash_tree)
1833 mw.put_sharehashes(self.share_hash_chain)
1834 mw.put_root_hash(self.root_hash)
1835 mw.put_signature(self.signature)
1836 mw.put_verification_key(self.verification_key)
1837 d = mw.finish_publishing()
1838 d.addCallback(_check_failure)
1839 d.addCallback(lambda ignored:
1840 mw.set_checkstring(""))
1841 d.addCallback(lambda ignored:
1842 mw.finish_publishing())
1843 d.addCallback(_check_success)
1847 def serialize_blockhashes(self, blockhashes):
1848 return "".join(blockhashes)
1851 def serialize_sharehashes(self, sharehashes):
1852 ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1853 for i in sorted(sharehashes.keys())])
1857 def test_write(self):
1858 # This translates to a file with 6 6-byte segments, and with 2-byte
1860 mw = self._make_new_mw("si1", 0)
1861 # Test writing some blocks.
1862 read = self.ss.remote_slot_readv
1863 expected_private_key_offset = struct.calcsize(MDMFHEADER)
1864 expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1865 PRIVATE_KEY_SIZE + \
1867 VERIFICATION_KEY_SIZE + \
1868 SHARE_HASH_CHAIN_SIZE
1869 written_block_size = 2 + len(self.salt)
1870 written_block = self.block + self.salt
1872 mw.put_block(self.block, i, self.salt)
1874 mw.put_encprivkey(self.encprivkey)
1875 mw.put_blockhashes(self.block_hash_tree)
1876 mw.put_sharehashes(self.share_hash_chain)
1877 mw.put_root_hash(self.root_hash)
1878 mw.put_signature(self.signature)
1879 mw.put_verification_key(self.verification_key)
1880 d = mw.finish_publishing()
1881 def _check_publish(results):
1882 self.failUnlessEqual(len(results), 2)
1883 result, ign = results
1884 self.failUnless(result, "publish failed")
1886 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1887 {0: [written_block]})
1889 self.failUnlessEqual(len(self.encprivkey), 7)
1890 self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1891 {0: [self.encprivkey]})
1893 expected_block_hash_offset = expected_sharedata_offset + \
1894 (6 * written_block_size)
1895 self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1896 self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1897 {0: [self.block_hash_tree_s]})
1899 expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1900 self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1901 {0: [self.share_hash_chain_s]})
1903 self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1904 {0: [self.root_hash]})
1905 expected_signature_offset = expected_share_hash_offset + \
1906 len(self.share_hash_chain_s)
1907 self.failUnlessEqual(len(self.signature), 9)
1908 self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1909 {0: [self.signature]})
1911 expected_verification_key_offset = expected_signature_offset + len(self.signature)
1912 self.failUnlessEqual(len(self.verification_key), 6)
1913 self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1914 {0: [self.verification_key]})
1916 signable = mw.get_signable()
1917 verno, seq, roothash, k, n, segsize, datalen = \
1918 struct.unpack(">BQ32sBBQQ",
1920 self.failUnlessEqual(verno, 1)
1921 self.failUnlessEqual(seq, 0)
1922 self.failUnlessEqual(roothash, self.root_hash)
1923 self.failUnlessEqual(k, 3)
1924 self.failUnlessEqual(n, 10)
1925 self.failUnlessEqual(segsize, 6)
1926 self.failUnlessEqual(datalen, 36)
1927 expected_eof_offset = expected_block_hash_offset + \
1928 len(self.block_hash_tree_s)
1930 # Check the version number to make sure that it is correct.
1931 expected_version_number = struct.pack(">B", 1)
1932 self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1933 {0: [expected_version_number]})
1934 # Check the sequence number to make sure that it is correct
1935 expected_sequence_number = struct.pack(">Q", 0)
1936 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
1937 {0: [expected_sequence_number]})
1938 # Check that the encoding parameters (k, N, segement size, data
1939 # length) are what they should be. These are 3, 10, 6, 36
1940 expected_k = struct.pack(">B", 3)
1941 self.failUnlessEqual(read("si1", [0], [(41, 1)]),
1943 expected_n = struct.pack(">B", 10)
1944 self.failUnlessEqual(read("si1", [0], [(42, 1)]),
1946 expected_segment_size = struct.pack(">Q", 6)
1947 self.failUnlessEqual(read("si1", [0], [(43, 8)]),
1948 {0: [expected_segment_size]})
1949 expected_data_length = struct.pack(">Q", 36)
1950 self.failUnlessEqual(read("si1", [0], [(51, 8)]),
1951 {0: [expected_data_length]})
1952 expected_offset = struct.pack(">Q", expected_private_key_offset)
1953 self.failUnlessEqual(read("si1", [0], [(59, 8)]),
1954 {0: [expected_offset]})
1955 expected_offset = struct.pack(">Q", expected_share_hash_offset)
1956 self.failUnlessEqual(read("si1", [0], [(67, 8)]),
1957 {0: [expected_offset]})
1958 expected_offset = struct.pack(">Q", expected_signature_offset)
1959 self.failUnlessEqual(read("si1", [0], [(75, 8)]),
1960 {0: [expected_offset]})
1961 expected_offset = struct.pack(">Q", expected_verification_key_offset)
1962 self.failUnlessEqual(read("si1", [0], [(83, 8)]),
1963 {0: [expected_offset]})
1964 expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
1965 self.failUnlessEqual(read("si1", [0], [(91, 8)]),
1966 {0: [expected_offset]})
1967 expected_offset = struct.pack(">Q", expected_sharedata_offset)
1968 self.failUnlessEqual(read("si1", [0], [(99, 8)]),
1969 {0: [expected_offset]})
1970 expected_offset = struct.pack(">Q", expected_block_hash_offset)
1971 self.failUnlessEqual(read("si1", [0], [(107, 8)]),
1972 {0: [expected_offset]})
1973 expected_offset = struct.pack(">Q", expected_eof_offset)
1974 self.failUnlessEqual(read("si1", [0], [(115, 8)]),
1975 {0: [expected_offset]})
1976 d.addCallback(_check_publish)
1979 def _make_new_mw(self, si, share, datalength=36):
1980 # This is a file of size 36 bytes. Since it has a segment
1981 # size of 6, we know that it has 6 byte segments, which will
1982 # be split into blocks of 2 bytes because our FEC k
1984 mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
1989 def test_write_rejected_with_too_many_blocks(self):
1990 mw = self._make_new_mw("si0", 0)
1992 # Try writing too many blocks. We should not be able to write
1994 # blocks into each share.
1995 d = defer.succeed(None)
1997 d.addCallback(lambda ignored, i=i:
1998 mw.put_block(self.block, i, self.salt))
1999 d.addCallback(lambda ignored:
2000 self.shouldFail(LayoutInvalid, "too many blocks",
2002 mw.put_block, self.block, 7, self.salt))
2006 def test_write_rejected_with_invalid_salt(self):
2007 # Try writing an invalid salt. Salts are 16 bytes -- any more or
2008 # less should cause an error.
2009 mw = self._make_new_mw("si1", 0)
2010 bad_salt = "a" * 17 # 17 bytes
2011 d = defer.succeed(None)
2012 d.addCallback(lambda ignored:
2013 self.shouldFail(LayoutInvalid, "test_invalid_salt",
2014 None, mw.put_block, self.block, 7, bad_salt))
2018 def test_write_rejected_with_invalid_root_hash(self):
2019 # Try writing an invalid root hash. This should be SHA256d, and
2020 # 32 bytes long as a result.
2021 mw = self._make_new_mw("si2", 0)
2022 # 17 bytes != 32 bytes
2023 invalid_root_hash = "a" * 17
2024 d = defer.succeed(None)
2025 # Before this test can work, we need to put some blocks + salts,
2026 # a block hash tree, and a share hash tree. Otherwise, we'll see
2027 # failures that match what we are looking for, but are caused by
2028 # the constraints imposed on operation ordering.
2030 d.addCallback(lambda ignored, i=i:
2031 mw.put_block(self.block, i, self.salt))
2032 d.addCallback(lambda ignored:
2033 mw.put_encprivkey(self.encprivkey))
2034 d.addCallback(lambda ignored:
2035 mw.put_blockhashes(self.block_hash_tree))
2036 d.addCallback(lambda ignored:
2037 mw.put_sharehashes(self.share_hash_chain))
2038 d.addCallback(lambda ignored:
2039 self.shouldFail(LayoutInvalid, "invalid root hash",
2040 None, mw.put_root_hash, invalid_root_hash))
2044 def test_write_rejected_with_invalid_blocksize(self):
2045 # The blocksize implied by the writer that we get from
2046 # _make_new_mw is 2bytes -- any more or any less than this
2047 # should be cause for failure, unless it is the tail segment, in
2048 # which case it may not be failure.
2050 mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2052 # 1 bytes != 2 bytes
2053 d = defer.succeed(None)
2054 d.addCallback(lambda ignored, invalid_block=invalid_block:
2055 self.shouldFail(LayoutInvalid, "test blocksize too small",
2056 None, mw.put_block, invalid_block, 0,
2058 invalid_block = invalid_block * 3
2059 # 3 bytes != 2 bytes
2060 d.addCallback(lambda ignored:
2061 self.shouldFail(LayoutInvalid, "test blocksize too large",
2063 mw.put_block, invalid_block, 0, self.salt))
2065 d.addCallback(lambda ignored, i=i:
2066 mw.put_block(self.block, i, self.salt))
2067 # Try to put an invalid tail segment
2068 d.addCallback(lambda ignored:
2069 self.shouldFail(LayoutInvalid, "test invalid tail segment",
2071 mw.put_block, self.block, 5, self.salt))
2073 d.addCallback(lambda ignored:
2074 mw.put_block(valid_block, 5, self.salt))
2078 def test_write_enforces_order_constraints(self):
2079 # We require that the MDMFSlotWriteProxy be interacted with in a
2083 # 1: write blocks and salts
2084 # 2: Write the encrypted private key
2085 # 3: Write the block hashes
2086 # 4: Write the share hashes
2087 # 5: Write the root hash and salt hash
2088 # 6: Write the signature and verification key
2089 # 7: Write the file.
2091 # Some of these can be performed out-of-order, and some can't.
2092 # The dependencies that I want to test here are:
2093 # - Private key before block hashes
2094 # - share hashes and block hashes before root hash
2095 # - root hash before signature
2096 # - signature before verification key
2097 mw0 = self._make_new_mw("si0", 0)
2099 d = defer.succeed(None)
2101 d.addCallback(lambda ignored, i=i:
2102 mw0.put_block(self.block, i, self.salt))
2104 # Try to write the share hash chain without writing the
2105 # encrypted private key
2106 d.addCallback(lambda ignored:
2107 self.shouldFail(LayoutInvalid, "share hash chain before "
2110 mw0.put_sharehashes, self.share_hash_chain))
2111 # Write the private key.
2112 d.addCallback(lambda ignored:
2113 mw0.put_encprivkey(self.encprivkey))
2115 # Now write the block hashes and try again
2116 d.addCallback(lambda ignored:
2117 mw0.put_blockhashes(self.block_hash_tree))
2119 # We haven't yet put the root hash on the share, so we shouldn't
2120 # be able to sign it.
2121 d.addCallback(lambda ignored:
2122 self.shouldFail(LayoutInvalid, "signature before root hash",
2123 None, mw0.put_signature, self.signature))
2125 d.addCallback(lambda ignored:
2126 self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2128 # ..and, since that fails, we also shouldn't be able to put the
2130 d.addCallback(lambda ignored:
2131 self.shouldFail(LayoutInvalid, "key before signature",
2132 None, mw0.put_verification_key,
2133 self.verification_key))
2135 # Now write the share hashes.
2136 d.addCallback(lambda ignored:
2137 mw0.put_sharehashes(self.share_hash_chain))
2138 # We should be able to write the root hash now too
2139 d.addCallback(lambda ignored:
2140 mw0.put_root_hash(self.root_hash))
2142 # We should still be unable to put the verification key
2143 d.addCallback(lambda ignored:
2144 self.shouldFail(LayoutInvalid, "key before signature",
2145 None, mw0.put_verification_key,
2146 self.verification_key))
2148 d.addCallback(lambda ignored:
2149 mw0.put_signature(self.signature))
2151 # We shouldn't be able to write the offsets to the remote server
2152 # until the offset table is finished; IOW, until we have written
2153 # the verification key.
2154 d.addCallback(lambda ignored:
2155 self.shouldFail(LayoutInvalid, "offsets before verification key",
2157 mw0.finish_publishing))
2159 d.addCallback(lambda ignored:
2160 mw0.put_verification_key(self.verification_key))
2164 def test_end_to_end(self):
2165 mw = self._make_new_mw("si1", 0)
2166 # Write a share using the mutable writer, and make sure that the
2167 # reader knows how to read everything back to us.
2168 d = defer.succeed(None)
2170 d.addCallback(lambda ignored, i=i:
2171 mw.put_block(self.block, i, self.salt))
2172 d.addCallback(lambda ignored:
2173 mw.put_encprivkey(self.encprivkey))
2174 d.addCallback(lambda ignored:
2175 mw.put_blockhashes(self.block_hash_tree))
2176 d.addCallback(lambda ignored:
2177 mw.put_sharehashes(self.share_hash_chain))
2178 d.addCallback(lambda ignored:
2179 mw.put_root_hash(self.root_hash))
2180 d.addCallback(lambda ignored:
2181 mw.put_signature(self.signature))
2182 d.addCallback(lambda ignored:
2183 mw.put_verification_key(self.verification_key))
2184 d.addCallback(lambda ignored:
2185 mw.finish_publishing())
2187 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2188 def _check_block_and_salt((block, salt)):
2189 self.failUnlessEqual(block, self.block)
2190 self.failUnlessEqual(salt, self.salt)
2193 d.addCallback(lambda ignored, i=i:
2194 mr.get_block_and_salt(i))
2195 d.addCallback(_check_block_and_salt)
2197 d.addCallback(lambda ignored:
2198 mr.get_encprivkey())
2199 d.addCallback(lambda encprivkey:
2200 self.failUnlessEqual(self.encprivkey, encprivkey))
2202 d.addCallback(lambda ignored:
2203 mr.get_blockhashes())
2204 d.addCallback(lambda blockhashes:
2205 self.failUnlessEqual(self.block_hash_tree, blockhashes))
2207 d.addCallback(lambda ignored:
2208 mr.get_sharehashes())
2209 d.addCallback(lambda sharehashes:
2210 self.failUnlessEqual(self.share_hash_chain, sharehashes))
2212 d.addCallback(lambda ignored:
2214 d.addCallback(lambda signature:
2215 self.failUnlessEqual(signature, self.signature))
2217 d.addCallback(lambda ignored:
2218 mr.get_verification_key())
2219 d.addCallback(lambda verification_key:
2220 self.failUnlessEqual(verification_key, self.verification_key))
2222 d.addCallback(lambda ignored:
2224 d.addCallback(lambda seqnum:
2225 self.failUnlessEqual(seqnum, 0))
2227 d.addCallback(lambda ignored:
2229 d.addCallback(lambda root_hash:
2230 self.failUnlessEqual(self.root_hash, root_hash))
2232 d.addCallback(lambda ignored:
2233 mr.get_encoding_parameters())
2234 def _check_encoding_parameters((k, n, segsize, datalen)):
2235 self.failUnlessEqual(k, 3)
2236 self.failUnlessEqual(n, 10)
2237 self.failUnlessEqual(segsize, 6)
2238 self.failUnlessEqual(datalen, 36)
2239 d.addCallback(_check_encoding_parameters)
2241 d.addCallback(lambda ignored:
2242 mr.get_checkstring())
2243 d.addCallback(lambda checkstring:
2244 self.failUnlessEqual(checkstring, mw.get_checkstring()))
2248 def test_is_sdmf(self):
2249 # The MDMFSlotReadProxy should also know how to read SDMF files,
2250 # since it will encounter them on the grid. Callers use the
2251 # is_sdmf method to test this.
2252 self.write_sdmf_share_to_server("si1")
2253 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2255 d.addCallback(lambda issdmf:
2256 self.failUnless(issdmf))
2260 def test_reads_sdmf(self):
2261 # The slot read proxy should, naturally, know how to tell us
2262 # about data in the SDMF format
2263 self.write_sdmf_share_to_server("si1")
2264 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2265 d = defer.succeed(None)
2266 d.addCallback(lambda ignored:
2268 d.addCallback(lambda issdmf:
2269 self.failUnless(issdmf))
2271 # What do we need to read?
2274 d.addCallback(lambda ignored:
2275 mr.get_block_and_salt(0))
2276 def _check_block_and_salt(results):
2277 block, salt = results
2278 # Our original file is 36 bytes long. Then each share is 12
2279 # bytes in size. The share is composed entirely of the
2280 # letter a. self.block contains 2 as, so 6 * self.block is
2281 # what we are looking for.
2282 self.failUnlessEqual(block, self.block * 6)
2283 self.failUnlessEqual(salt, self.salt)
2284 d.addCallback(_check_block_and_salt)
2287 d.addCallback(lambda ignored:
2288 mr.get_blockhashes())
2289 d.addCallback(lambda blockhashes:
2290 self.failUnlessEqual(self.block_hash_tree,
2294 d.addCallback(lambda ignored:
2295 mr.get_sharehashes())
2296 d.addCallback(lambda sharehashes:
2297 self.failUnlessEqual(self.share_hash_chain,
2300 d.addCallback(lambda ignored:
2301 mr.get_encprivkey())
2302 d.addCallback(lambda encprivkey:
2303 self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2304 d.addCallback(lambda ignored:
2305 mr.get_verification_key())
2306 d.addCallback(lambda verification_key:
2307 self.failUnlessEqual(verification_key,
2308 self.verification_key,
2311 d.addCallback(lambda ignored:
2313 d.addCallback(lambda signature:
2314 self.failUnlessEqual(signature, self.signature, signature))
2316 # - The sequence number
2317 d.addCallback(lambda ignored:
2319 d.addCallback(lambda seqnum:
2320 self.failUnlessEqual(seqnum, 0, seqnum))
2323 d.addCallback(lambda ignored:
2325 d.addCallback(lambda root_hash:
2326 self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2330 def test_only_reads_one_segment_sdmf(self):
2331 # SDMF shares have only one segment, so it doesn't make sense to
2332 # read more segments than that. The reader should know this and
2333 # complain if we try to do that.
2334 self.write_sdmf_share_to_server("si1")
2335 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2336 d = defer.succeed(None)
2337 d.addCallback(lambda ignored:
2339 d.addCallback(lambda issdmf:
2340 self.failUnless(issdmf))
2341 d.addCallback(lambda ignored:
2342 self.shouldFail(LayoutInvalid, "test bad segment",
2344 mr.get_block_and_salt, 1))
2348 def test_read_with_prefetched_mdmf_data(self):
2349 # The MDMFSlotReadProxy will prefill certain fields if you pass
2350 # it data that you have already fetched. This is useful for
2351 # cases like the Servermap, which prefetches ~2kb of data while
2352 # finding out which shares are on the remote peer so that it
2353 # doesn't waste round trips.
2354 mdmf_data = self.build_test_mdmf_share()
2355 self.write_test_share_to_server("si1")
2356 def _make_mr(ignored, length):
2357 mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2360 d = defer.succeed(None)
2361 # This should be enough to fill in both the encoding parameters
2362 # and the table of offsets, which will complete the version
2363 # information tuple.
2364 d.addCallback(_make_mr, 123)
2365 d.addCallback(lambda mr:
2367 def _check_verinfo(verinfo):
2368 self.failUnless(verinfo)
2369 self.failUnlessEqual(len(verinfo), 9)
2379 self.failUnlessEqual(seqnum, 0)
2380 self.failUnlessEqual(root_hash, self.root_hash)
2381 self.failUnlessEqual(segsize, 6)
2382 self.failUnlessEqual(datalen, 36)
2383 self.failUnlessEqual(k, 3)
2384 self.failUnlessEqual(n, 10)
2385 expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2393 self.failUnlessEqual(expected_prefix, prefix)
2394 self.failUnlessEqual(self.rref.read_count, 0)
2395 d.addCallback(_check_verinfo)
2396 # This is not enough data to read a block and a share, so the
2397 # wrapper should attempt to read this from the remote server.
2398 d.addCallback(_make_mr, 123)
2399 d.addCallback(lambda mr:
2400 mr.get_block_and_salt(0))
2401 def _check_block_and_salt((block, salt)):
2402 self.failUnlessEqual(block, self.block)
2403 self.failUnlessEqual(salt, self.salt)
2404 self.failUnlessEqual(self.rref.read_count, 1)
2405 # This should be enough data to read one block.
2406 d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2407 d.addCallback(lambda mr:
2408 mr.get_block_and_salt(0))
2409 d.addCallback(_check_block_and_salt)
2413 def test_read_with_prefetched_sdmf_data(self):
2414 sdmf_data = self.build_test_sdmf_share()
2415 self.write_sdmf_share_to_server("si1")
2416 def _make_mr(ignored, length):
2417 mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2420 d = defer.succeed(None)
2421 # This should be enough to get us the encoding parameters,
2422 # offset table, and everything else we need to build a verinfo
2424 d.addCallback(_make_mr, 123)
2425 d.addCallback(lambda mr:
2427 def _check_verinfo(verinfo):
2428 self.failUnless(verinfo)
2429 self.failUnlessEqual(len(verinfo), 9)
2439 self.failUnlessEqual(seqnum, 0)
2440 self.failUnlessEqual(root_hash, self.root_hash)
2441 self.failUnlessEqual(salt, self.salt)
2442 self.failUnlessEqual(segsize, 36)
2443 self.failUnlessEqual(datalen, 36)
2444 self.failUnlessEqual(k, 3)
2445 self.failUnlessEqual(n, 10)
2446 expected_prefix = struct.pack(SIGNED_PREFIX,
2455 self.failUnlessEqual(expected_prefix, prefix)
2456 self.failUnlessEqual(self.rref.read_count, 0)
2457 d.addCallback(_check_verinfo)
2458 # This shouldn't be enough to read any share data.
2459 d.addCallback(_make_mr, 123)
2460 d.addCallback(lambda mr:
2461 mr.get_block_and_salt(0))
2462 def _check_block_and_salt((block, salt)):
2463 self.failUnlessEqual(block, self.block * 6)
2464 self.failUnlessEqual(salt, self.salt)
2465 # TODO: Fix the read routine so that it reads only the data
2466 # that it has cached if it can't read all of it.
2467 self.failUnlessEqual(self.rref.read_count, 2)
2469 # This should be enough to read share data.
2470 d.addCallback(_make_mr, self.offsets['share_data'])
2471 d.addCallback(lambda mr:
2472 mr.get_block_and_salt(0))
2473 d.addCallback(_check_block_and_salt)
2477 def test_read_with_empty_mdmf_file(self):
2478 # Some tests upload a file with no contents to test things
2479 # unrelated to the actual handling of the content of the file.
2480 # The reader should behave intelligently in these cases.
2481 self.write_test_share_to_server("si1", empty=True)
2482 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2483 # We should be able to get the encoding parameters, and they
2484 # should be correct.
2485 d = defer.succeed(None)
2486 d.addCallback(lambda ignored:
2487 mr.get_encoding_parameters())
2488 def _check_encoding_parameters(params):
2489 self.failUnlessEqual(len(params), 4)
2490 k, n, segsize, datalen = params
2491 self.failUnlessEqual(k, 3)
2492 self.failUnlessEqual(n, 10)
2493 self.failUnlessEqual(segsize, 0)
2494 self.failUnlessEqual(datalen, 0)
2495 d.addCallback(_check_encoding_parameters)
2497 # We should not be able to fetch a block, since there are no
2499 d.addCallback(lambda ignored:
2500 self.shouldFail(LayoutInvalid, "get block on empty file",
2502 mr.get_block_and_salt, 0))
2506 def test_read_with_empty_sdmf_file(self):
2507 self.write_sdmf_share_to_server("si1", empty=True)
2508 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2509 # We should be able to get the encoding parameters, and they
2511 d = defer.succeed(None)
2512 d.addCallback(lambda ignored:
2513 mr.get_encoding_parameters())
2514 def _check_encoding_parameters(params):
2515 self.failUnlessEqual(len(params), 4)
2516 k, n, segsize, datalen = params
2517 self.failUnlessEqual(k, 3)
2518 self.failUnlessEqual(n, 10)
2519 self.failUnlessEqual(segsize, 0)
2520 self.failUnlessEqual(datalen, 0)
2521 d.addCallback(_check_encoding_parameters)
2523 # It does not make sense to get a block in this format, so we
2524 # should not be able to.
2525 d.addCallback(lambda ignored:
2526 self.shouldFail(LayoutInvalid, "get block on an empty file",
2528 mr.get_block_and_salt, 0))
2532 def test_verinfo_with_sdmf_file(self):
2533 self.write_sdmf_share_to_server("si1")
2534 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2535 # We should be able to get the version information.
2536 d = defer.succeed(None)
2537 d.addCallback(lambda ignored:
2539 def _check_verinfo(verinfo):
2540 self.failUnless(verinfo)
2541 self.failUnlessEqual(len(verinfo), 9)
2551 self.failUnlessEqual(seqnum, 0)
2552 self.failUnlessEqual(root_hash, self.root_hash)
2553 self.failUnlessEqual(salt, self.salt)
2554 self.failUnlessEqual(segsize, 36)
2555 self.failUnlessEqual(datalen, 36)
2556 self.failUnlessEqual(k, 3)
2557 self.failUnlessEqual(n, 10)
2558 expected_prefix = struct.pack(">BQ32s16s BBQQ",
2567 self.failUnlessEqual(prefix, expected_prefix)
2568 self.failUnlessEqual(offsets, self.offsets)
2569 d.addCallback(_check_verinfo)
2573 def test_verinfo_with_mdmf_file(self):
2574 self.write_test_share_to_server("si1")
2575 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2576 d = defer.succeed(None)
2577 d.addCallback(lambda ignored:
2579 def _check_verinfo(verinfo):
2580 self.failUnless(verinfo)
2581 self.failUnlessEqual(len(verinfo), 9)
2591 self.failUnlessEqual(seqnum, 0)
2592 self.failUnlessEqual(root_hash, self.root_hash)
2594 self.failUnlessEqual(segsize, 6)
2595 self.failUnlessEqual(datalen, 36)
2596 self.failUnlessEqual(k, 3)
2597 self.failUnlessEqual(n, 10)
2598 expected_prefix = struct.pack(">BQ32s BBQQ",
2606 self.failUnlessEqual(prefix, expected_prefix)
2607 self.failUnlessEqual(offsets, self.offsets)
2608 d.addCallback(_check_verinfo)
2612 def test_sdmf_writer(self):
2613 # Go through the motions of writing an SDMF share to the storage
2614 # server. Then read the storage server to see that the share got
2615 # written in the way that we think it should have.
2617 # We do this first so that the necessary instance variables get
2618 # set the way we want them for the tests below.
2619 data = self.build_test_sdmf_share()
2620 sdmfr = SDMFSlotWriteProxy(0,
2625 # Put the block and salt.
2626 sdmfr.put_block(self.blockdata, 0, self.salt)
2628 # Put the encprivkey
2629 sdmfr.put_encprivkey(self.encprivkey)
2631 # Put the block and share hash chains
2632 sdmfr.put_blockhashes(self.block_hash_tree)
2633 sdmfr.put_sharehashes(self.share_hash_chain)
2634 sdmfr.put_root_hash(self.root_hash)
2637 sdmfr.put_signature(self.signature)
2639 # Put the verification key
2640 sdmfr.put_verification_key(self.verification_key)
2642 # Now check to make sure that nothing has been written yet.
2643 self.failUnlessEqual(self.rref.write_count, 0)
2645 # Now finish publishing
2646 d = sdmfr.finish_publishing()
2648 self.failUnlessEqual(self.rref.write_count, 1)
2649 read = self.ss.remote_slot_readv
2650 self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2652 d.addCallback(_then)
2656 def test_sdmf_writer_preexisting_share(self):
2657 data = self.build_test_sdmf_share()
2658 self.write_sdmf_share_to_server("si1")
2660 # Now there is a share on the storage server. To successfully
2661 # write, we need to set the checkstring correctly. When we
2662 # don't, no write should occur.
2663 sdmfw = SDMFSlotWriteProxy(0,
2668 sdmfw.put_block(self.blockdata, 0, self.salt)
2670 # Put the encprivkey
2671 sdmfw.put_encprivkey(self.encprivkey)
2673 # Put the block and share hash chains
2674 sdmfw.put_blockhashes(self.block_hash_tree)
2675 sdmfw.put_sharehashes(self.share_hash_chain)
2678 sdmfw.put_root_hash(self.root_hash)
2681 sdmfw.put_signature(self.signature)
2683 # Put the verification key
2684 sdmfw.put_verification_key(self.verification_key)
2686 # We shouldn't have a checkstring yet
2687 self.failUnlessEqual(sdmfw.get_checkstring(), "")
2689 d = sdmfw.finish_publishing()
2691 self.failIf(results[0])
2692 # this is the correct checkstring
2693 self._expected_checkstring = results[1][0][0]
2694 return self._expected_checkstring
2696 d.addCallback(_then)
2697 d.addCallback(sdmfw.set_checkstring)
2698 d.addCallback(lambda ignored:
2699 sdmfw.get_checkstring())
2700 d.addCallback(lambda checkstring:
2701 self.failUnlessEqual(checkstring, self._expected_checkstring))
2702 d.addCallback(lambda ignored:
2703 sdmfw.finish_publishing())
2704 def _then_again(results):
2705 self.failUnless(results[0])
2706 read = self.ss.remote_slot_readv
2707 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2708 {0: [struct.pack(">Q", 1)]})
2709 self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2711 d.addCallback(_then_again)
2715 class Stats(unittest.TestCase):
2718 self.sparent = LoggingServiceParent()
2719 self._lease_secret = itertools.count()
2721 return self.sparent.stopService()
2723 def workdir(self, name):
2724 basedir = os.path.join("storage", "Server", name)
2727 def create(self, name):
2728 workdir = self.workdir(name)
2729 ss = StorageServer(workdir, "\x00" * 20)
2730 ss.setServiceParent(self.sparent)
2733 def test_latencies(self):
2734 ss = self.create("test_latencies")
2735 for i in range(10000):
2736 ss.add_latency("allocate", 1.0 * i)
2737 for i in range(1000):
2738 ss.add_latency("renew", 1.0 * i)
2740 ss.add_latency("write", 1.0 * i)
2742 ss.add_latency("cancel", 2.0 * i)
2743 ss.add_latency("get", 5.0)
2745 output = ss.get_latencies()
2747 self.failUnlessEqual(sorted(output.keys()),
2748 sorted(["allocate", "renew", "cancel", "write", "get"]))
2749 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2750 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2751 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2752 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2753 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2754 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2755 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2756 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2757 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2759 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2760 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2761 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
2762 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2763 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2764 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2765 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2766 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2767 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2769 self.failUnlessEqual(len(ss.latencies["write"]), 20)
2770 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2771 self.failUnless(output["write"]["01_0_percentile"] is None, output)
2772 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
2773 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2774 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2775 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2776 self.failUnless(output["write"]["99_0_percentile"] is None, output)
2777 self.failUnless(output["write"]["99_9_percentile"] is None, output)
2779 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2780 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2781 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2782 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
2783 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2784 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2785 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2786 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2787 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2789 self.failUnlessEqual(len(ss.latencies["get"]), 1)
2790 self.failUnless(output["get"]["mean"] is None, output)
2791 self.failUnless(output["get"]["01_0_percentile"] is None, output)
2792 self.failUnless(output["get"]["10_0_percentile"] is None, output)
2793 self.failUnless(output["get"]["50_0_percentile"] is None, output)
2794 self.failUnless(output["get"]["90_0_percentile"] is None, output)
2795 self.failUnless(output["get"]["95_0_percentile"] is None, output)
2796 self.failUnless(output["get"]["99_0_percentile"] is None, output)
2797 self.failUnless(output["get"]["99_9_percentile"] is None, output)
2800 s = re.sub(r'<[^>]*>', ' ', s)
2801 s = re.sub(r'\s+', ' ', s)
2804 class MyBucketCountingCrawler(BucketCountingCrawler):
2805 def finished_prefix(self, cycle, prefix):
2806 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2808 d = self.hook_ds.pop(0)
2811 class MyStorageServer(StorageServer):
2812 def add_bucket_counter(self):
2813 statefile = os.path.join(self.storedir, "bucket_counter.state")
2814 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2815 self.bucket_counter.setServiceParent(self)
2817 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2820 self.s = service.MultiService()
2821 self.s.startService()
2823 return self.s.stopService()
2825 def test_bucket_counter(self):
2826 basedir = "storage/BucketCounter/bucket_counter"
2827 fileutil.make_dirs(basedir)
2828 ss = StorageServer(basedir, "\x00" * 20)
2829 # to make sure we capture the bucket-counting-crawler in the middle
2830 # of a cycle, we reach in and reduce its maximum slice time to 0. We
2831 # also make it start sooner than usual.
2832 ss.bucket_counter.slow_start = 0
2833 orig_cpu_slice = ss.bucket_counter.cpu_slice
2834 ss.bucket_counter.cpu_slice = 0
2835 ss.setServiceParent(self.s)
2837 w = StorageStatus(ss)
2839 # this sample is before the crawler has started doing anything
2840 html = w.renderSynchronously()
2841 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2842 s = remove_tags(html)
2843 self.failUnlessIn("Accepting new shares: Yes", s)
2844 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2845 self.failUnlessIn("Total buckets: Not computed yet", s)
2846 self.failUnlessIn("Next crawl in", s)
2848 # give the bucket-counting-crawler one tick to get started. The
2849 # cpu_slice=0 will force it to yield right after it processes the
2852 d = fireEventually()
2853 def _check(ignored):
2854 # are we really right after the first prefix?
2855 state = ss.bucket_counter.get_state()
2856 if state["last-complete-prefix"] is None:
2857 d2 = fireEventually()
2858 d2.addCallback(_check)
2860 self.failUnlessEqual(state["last-complete-prefix"],
2861 ss.bucket_counter.prefixes[0])
2862 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2863 html = w.renderSynchronously()
2864 s = remove_tags(html)
2865 self.failUnlessIn(" Current crawl ", s)
2866 self.failUnlessIn(" (next work in ", s)
2867 d.addCallback(_check)
2869 # now give it enough time to complete a full cycle
2871 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2872 d.addCallback(lambda ignored: self.poll(_watch))
2873 def _check2(ignored):
2874 ss.bucket_counter.cpu_slice = orig_cpu_slice
2875 html = w.renderSynchronously()
2876 s = remove_tags(html)
2877 self.failUnlessIn("Total buckets: 0 (the number of", s)
2878 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2879 d.addCallback(_check2)
2882 def test_bucket_counter_cleanup(self):
2883 basedir = "storage/BucketCounter/bucket_counter_cleanup"
2884 fileutil.make_dirs(basedir)
2885 ss = StorageServer(basedir, "\x00" * 20)
2886 # to make sure we capture the bucket-counting-crawler in the middle
2887 # of a cycle, we reach in and reduce its maximum slice time to 0.
2888 ss.bucket_counter.slow_start = 0
2889 orig_cpu_slice = ss.bucket_counter.cpu_slice
2890 ss.bucket_counter.cpu_slice = 0
2891 ss.setServiceParent(self.s)
2893 d = fireEventually()
2895 def _after_first_prefix(ignored):
2896 state = ss.bucket_counter.state
2897 if state["last-complete-prefix"] is None:
2898 d2 = fireEventually()
2899 d2.addCallback(_after_first_prefix)
2901 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2902 # now sneak in and mess with its state, to make sure it cleans up
2903 # properly at the end of the cycle
2904 self.failUnlessEqual(state["last-complete-prefix"],
2905 ss.bucket_counter.prefixes[0])
2906 state["bucket-counts"][-12] = {}
2907 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2908 ss.bucket_counter.save_state()
2909 d.addCallback(_after_first_prefix)
2911 # now give it enough time to complete a cycle
2913 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2914 d.addCallback(lambda ignored: self.poll(_watch))
2915 def _check2(ignored):
2916 ss.bucket_counter.cpu_slice = orig_cpu_slice
2917 s = ss.bucket_counter.get_state()
2918 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2919 self.failIf("bogusprefix!" in s["storage-index-samples"],
2920 s["storage-index-samples"].keys())
2921 d.addCallback(_check2)
2924 def test_bucket_counter_eta(self):
2925 basedir = "storage/BucketCounter/bucket_counter_eta"
2926 fileutil.make_dirs(basedir)
2927 ss = MyStorageServer(basedir, "\x00" * 20)
2928 ss.bucket_counter.slow_start = 0
2929 # these will be fired inside finished_prefix()
2930 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2931 w = StorageStatus(ss)
2933 d = defer.Deferred()
2935 def _check_1(ignored):
2936 # no ETA is available yet
2937 html = w.renderSynchronously()
2938 s = remove_tags(html)
2939 self.failUnlessIn("complete (next work", s)
2941 def _check_2(ignored):
2942 # one prefix has finished, so an ETA based upon that elapsed time
2943 # should be available.
2944 html = w.renderSynchronously()
2945 s = remove_tags(html)
2946 self.failUnlessIn("complete (ETA ", s)
2948 def _check_3(ignored):
2949 # two prefixes have finished
2950 html = w.renderSynchronously()
2951 s = remove_tags(html)
2952 self.failUnlessIn("complete (ETA ", s)
2955 hooks[0].addCallback(_check_1).addErrback(d.errback)
2956 hooks[1].addCallback(_check_2).addErrback(d.errback)
2957 hooks[2].addCallback(_check_3).addErrback(d.errback)
2959 ss.setServiceParent(self.s)
2962 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
2963 stop_after_first_bucket = False
2964 def process_bucket(self, *args, **kwargs):
2965 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
2966 if self.stop_after_first_bucket:
2967 self.stop_after_first_bucket = False
2968 self.cpu_slice = -1.0
2969 def yielding(self, sleep_time):
2970 if not self.stop_after_first_bucket:
2971 self.cpu_slice = 500
2973 class BrokenStatResults:
2975 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
2978 bsr = BrokenStatResults()
2979 for attrname in dir(s):
2980 if attrname.startswith("_"):
2982 if attrname == "st_blocks":
2984 setattr(bsr, attrname, getattr(s, attrname))
2987 class InstrumentedStorageServer(StorageServer):
2988 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
2989 class No_ST_BLOCKS_StorageServer(StorageServer):
2990 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
2992 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2995 self.s = service.MultiService()
2996 self.s.startService()
2998 return self.s.stopService()
3000 def make_shares(self, ss):
3002 return (si, hashutil.tagged_hash("renew", si),
3003 hashutil.tagged_hash("cancel", si))
3004 def make_mutable(si):
3005 return (si, hashutil.tagged_hash("renew", si),
3006 hashutil.tagged_hash("cancel", si),
3007 hashutil.tagged_hash("write-enabler", si))
3008 def make_extra_lease(si, num):
3009 return (hashutil.tagged_hash("renew-%d" % num, si),
3010 hashutil.tagged_hash("cancel-%d" % num, si))
3012 immutable_si_0, rs0, cs0 = make("\x00" * 16)
3013 immutable_si_1, rs1, cs1 = make("\x01" * 16)
3014 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3015 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3016 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3017 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3019 canary = FakeCanary()
3020 # note: 'tahoe debug dump-share' will not handle this file, since the
3021 # inner contents are not a valid CHK share
3022 data = "\xff" * 1000
3024 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3026 w[0].remote_write(0, data)
3029 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3031 w[0].remote_write(0, data)
3033 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3035 writev = ss.remote_slot_testv_and_readv_and_writev
3036 writev(mutable_si_2, (we2, rs2, cs2),
3037 {0: ([], [(0,data)], len(data))}, [])
3038 writev(mutable_si_3, (we3, rs3, cs3),
3039 {0: ([], [(0,data)], len(data))}, [])
3040 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3042 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3043 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3044 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3046 def test_basic(self):
3047 basedir = "storage/LeaseCrawler/basic"
3048 fileutil.make_dirs(basedir)
3049 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3050 # make it start sooner than usual.
3051 lc = ss.lease_checker
3054 lc.stop_after_first_bucket = True
3055 webstatus = StorageStatus(ss)
3057 # create a few shares, with some leases on them
3058 self.make_shares(ss)
3059 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3061 # add a non-sharefile to exercise another code path
3062 fn = os.path.join(ss.sharedir,
3063 storage_index_to_dir(immutable_si_0),
3066 f.write("I am not a share.\n")
3069 # this is before the crawl has started, so we're not in a cycle yet
3070 initial_state = lc.get_state()
3071 self.failIf(lc.get_progress()["cycle-in-progress"])
3072 self.failIfIn("cycle-to-date", initial_state)
3073 self.failIfIn("estimated-remaining-cycle", initial_state)
3074 self.failIfIn("estimated-current-cycle", initial_state)
3075 self.failUnlessIn("history", initial_state)
3076 self.failUnlessEqual(initial_state["history"], {})
3078 ss.setServiceParent(self.s)
3082 d = fireEventually()
3084 # now examine the state right after the first bucket has been
3086 def _after_first_bucket(ignored):
3087 initial_state = lc.get_state()
3088 if "cycle-to-date" not in initial_state:
3089 d2 = fireEventually()
3090 d2.addCallback(_after_first_bucket)
3092 self.failUnlessIn("cycle-to-date", initial_state)
3093 self.failUnlessIn("estimated-remaining-cycle", initial_state)
3094 self.failUnlessIn("estimated-current-cycle", initial_state)
3095 self.failUnlessIn("history", initial_state)
3096 self.failUnlessEqual(initial_state["history"], {})
3098 so_far = initial_state["cycle-to-date"]
3099 self.failUnlessEqual(so_far["expiration-enabled"], False)
3100 self.failUnlessIn("configured-expiration-mode", so_far)
3101 self.failUnlessIn("lease-age-histogram", so_far)
3102 lah = so_far["lease-age-histogram"]
3103 self.failUnlessEqual(type(lah), list)
3104 self.failUnlessEqual(len(lah), 1)
3105 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3106 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3107 self.failUnlessEqual(so_far["corrupt-shares"], [])
3108 sr1 = so_far["space-recovered"]
3109 self.failUnlessEqual(sr1["examined-buckets"], 1)
3110 self.failUnlessEqual(sr1["examined-shares"], 1)
3111 self.failUnlessEqual(sr1["actual-shares"], 0)
3112 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3113 self.failUnlessEqual(sr1["original-sharebytes"], 0)
3114 left = initial_state["estimated-remaining-cycle"]
3115 sr2 = left["space-recovered"]
3116 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3117 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3118 self.failIfEqual(sr2["actual-shares"], None)
3119 self.failIfEqual(sr2["configured-diskbytes"], None)
3120 self.failIfEqual(sr2["original-sharebytes"], None)
3121 d.addCallback(_after_first_bucket)
3122 d.addCallback(lambda ign: self.render1(webstatus))
3123 def _check_html_in_cycle(html):
3124 s = remove_tags(html)
3125 self.failUnlessIn("So far, this cycle has examined "
3126 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3127 self.failUnlessIn("and has recovered: "
3128 "0 shares, 0 buckets (0 mutable / 0 immutable), "
3129 "0 B (0 B / 0 B)", s)
3130 self.failUnlessIn("If expiration were enabled, "
3131 "we would have recovered: "
3132 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3133 " 0 B (0 B / 0 B) by now", s)
3134 self.failUnlessIn("and the remainder of this cycle "
3135 "would probably recover: "
3136 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3137 " 0 B (0 B / 0 B)", s)
3138 self.failUnlessIn("and the whole cycle would probably recover: "
3139 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3140 " 0 B (0 B / 0 B)", s)
3141 self.failUnlessIn("if we were strictly using each lease's default "
3142 "31-day lease lifetime", s)
3143 self.failUnlessIn("this cycle would be expected to recover: ", s)
3144 d.addCallback(_check_html_in_cycle)
3146 # wait for the crawler to finish the first cycle. Nothing should have
3149 return bool(lc.get_state()["last-cycle-finished"] is not None)
3150 d.addCallback(lambda ign: self.poll(_wait))
3152 def _after_first_cycle(ignored):
3154 self.failIf("cycle-to-date" in s)
3155 self.failIf("estimated-remaining-cycle" in s)
3156 self.failIf("estimated-current-cycle" in s)
3157 last = s["history"][0]
3158 self.failUnlessIn("cycle-start-finish-times", last)
3159 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3160 self.failUnlessEqual(last["expiration-enabled"], False)
3161 self.failUnlessIn("configured-expiration-mode", last)
3163 self.failUnlessIn("lease-age-histogram", last)
3164 lah = last["lease-age-histogram"]
3165 self.failUnlessEqual(type(lah), list)
3166 self.failUnlessEqual(len(lah), 1)
3167 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3169 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3170 self.failUnlessEqual(last["corrupt-shares"], [])
3172 rec = last["space-recovered"]
3173 self.failUnlessEqual(rec["examined-buckets"], 4)
3174 self.failUnlessEqual(rec["examined-shares"], 4)
3175 self.failUnlessEqual(rec["actual-buckets"], 0)
3176 self.failUnlessEqual(rec["original-buckets"], 0)
3177 self.failUnlessEqual(rec["configured-buckets"], 0)
3178 self.failUnlessEqual(rec["actual-shares"], 0)
3179 self.failUnlessEqual(rec["original-shares"], 0)
3180 self.failUnlessEqual(rec["configured-shares"], 0)
3181 self.failUnlessEqual(rec["actual-diskbytes"], 0)
3182 self.failUnlessEqual(rec["original-diskbytes"], 0)
3183 self.failUnlessEqual(rec["configured-diskbytes"], 0)
3184 self.failUnlessEqual(rec["actual-sharebytes"], 0)
3185 self.failUnlessEqual(rec["original-sharebytes"], 0)
3186 self.failUnlessEqual(rec["configured-sharebytes"], 0)
3188 def _get_sharefile(si):
3189 return list(ss._iter_share_files(si))[0]
3190 def count_leases(si):
3191 return len(list(_get_sharefile(si).get_leases()))
3192 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3193 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3194 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3195 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3196 d.addCallback(_after_first_cycle)
3197 d.addCallback(lambda ign: self.render1(webstatus))
3198 def _check_html(html):
3199 s = remove_tags(html)
3200 self.failUnlessIn("recovered: 0 shares, 0 buckets "
3201 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3202 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3203 "(2 mutable / 2 immutable),", s)
3204 self.failUnlessIn("but expiration was not enabled", s)
3205 d.addCallback(_check_html)
3206 d.addCallback(lambda ign: self.render_json(webstatus))
3207 def _check_json(json):
3208 data = simplejson.loads(json)
3209 self.failUnlessIn("lease-checker", data)
3210 self.failUnlessIn("lease-checker-progress", data)
3211 d.addCallback(_check_json)
3214 def backdate_lease(self, sf, renew_secret, new_expire_time):
3215 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3216 # "renew" a lease with a new_expire_time that is older than what the
3217 # current lease has), so we have to reach inside it.
3218 for i,lease in enumerate(sf.get_leases()):
3219 if lease.renew_secret == renew_secret:
3220 lease.expiration_time = new_expire_time
3221 f = open(sf.home, 'rb+')
3222 sf._write_lease_record(f, i, lease)
3225 raise IndexError("unable to renew non-existent lease")
3227 def test_expire_age(self):
3228 basedir = "storage/LeaseCrawler/expire_age"
3229 fileutil.make_dirs(basedir)
3230 # setting expiration_time to 2000 means that any lease which is more
3231 # than 2000s old will be expired.
3232 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3233 expiration_enabled=True,
3234 expiration_mode="age",
3235 expiration_override_lease_duration=2000)
3236 # make it start sooner than usual.
3237 lc = ss.lease_checker
3239 lc.stop_after_first_bucket = True
3240 webstatus = StorageStatus(ss)
3242 # create a few shares, with some leases on them
3243 self.make_shares(ss)
3244 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3246 def count_shares(si):
3247 return len(list(ss._iter_share_files(si)))
3248 def _get_sharefile(si):
3249 return list(ss._iter_share_files(si))[0]
3250 def count_leases(si):
3251 return len(list(_get_sharefile(si).get_leases()))
3253 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3254 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3255 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3256 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3257 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3258 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3259 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3260 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3262 # artificially crank back the expiration time on the first lease of
3263 # each share, to make it look like it expired already (age=1000s).
3264 # Some shares have an extra lease which is set to expire at the
3265 # default time in 31 days from now (age=31days). We then run the
3266 # crawler, which will expire the first lease, making some shares get
3267 # deleted and others stay alive (with one remaining lease)
3270 sf0 = _get_sharefile(immutable_si_0)
3271 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3272 sf0_size = os.stat(sf0.home).st_size
3274 # immutable_si_1 gets an extra lease
3275 sf1 = _get_sharefile(immutable_si_1)
3276 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3278 sf2 = _get_sharefile(mutable_si_2)
3279 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3280 sf2_size = os.stat(sf2.home).st_size
3282 # mutable_si_3 gets an extra lease
3283 sf3 = _get_sharefile(mutable_si_3)
3284 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3286 ss.setServiceParent(self.s)
3288 d = fireEventually()
3289 # examine the state right after the first bucket has been processed
3290 def _after_first_bucket(ignored):
3291 p = lc.get_progress()
3292 if not p["cycle-in-progress"]:
3293 d2 = fireEventually()
3294 d2.addCallback(_after_first_bucket)
3296 d.addCallback(_after_first_bucket)
3297 d.addCallback(lambda ign: self.render1(webstatus))
3298 def _check_html_in_cycle(html):
3299 s = remove_tags(html)
3300 # the first bucket encountered gets deleted, and its prefix
3301 # happens to be about 1/5th of the way through the ring, so the
3302 # predictor thinks we'll have 5 shares and that we'll delete them
3303 # all. This part of the test depends upon the SIs landing right
3304 # where they do now.
3305 self.failUnlessIn("The remainder of this cycle is expected to "
3306 "recover: 4 shares, 4 buckets", s)
3307 self.failUnlessIn("The whole cycle is expected to examine "
3308 "5 shares in 5 buckets and to recover: "
3309 "5 shares, 5 buckets", s)
3310 d.addCallback(_check_html_in_cycle)
3312 # wait for the crawler to finish the first cycle. Two shares should
3315 return bool(lc.get_state()["last-cycle-finished"] is not None)
3316 d.addCallback(lambda ign: self.poll(_wait))
3318 def _after_first_cycle(ignored):
3319 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3320 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3321 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3322 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3323 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3324 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3327 last = s["history"][0]
3329 self.failUnlessEqual(last["expiration-enabled"], True)
3330 self.failUnlessEqual(last["configured-expiration-mode"],
3331 ("age", 2000, None, ("mutable", "immutable")))
3332 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3334 rec = last["space-recovered"]
3335 self.failUnlessEqual(rec["examined-buckets"], 4)
3336 self.failUnlessEqual(rec["examined-shares"], 4)
3337 self.failUnlessEqual(rec["actual-buckets"], 2)
3338 self.failUnlessEqual(rec["original-buckets"], 2)
3339 self.failUnlessEqual(rec["configured-buckets"], 2)
3340 self.failUnlessEqual(rec["actual-shares"], 2)
3341 self.failUnlessEqual(rec["original-shares"], 2)
3342 self.failUnlessEqual(rec["configured-shares"], 2)
3343 size = sf0_size + sf2_size
3344 self.failUnlessEqual(rec["actual-sharebytes"], size)
3345 self.failUnlessEqual(rec["original-sharebytes"], size)
3346 self.failUnlessEqual(rec["configured-sharebytes"], size)
3347 # different platforms have different notions of "blocks used by
3348 # this file", so merely assert that it's a number
3349 self.failUnless(rec["actual-diskbytes"] >= 0,
3350 rec["actual-diskbytes"])
3351 self.failUnless(rec["original-diskbytes"] >= 0,
3352 rec["original-diskbytes"])
3353 self.failUnless(rec["configured-diskbytes"] >= 0,
3354 rec["configured-diskbytes"])
3355 d.addCallback(_after_first_cycle)
3356 d.addCallback(lambda ign: self.render1(webstatus))
3357 def _check_html(html):
3358 s = remove_tags(html)
3359 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3360 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3361 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3362 d.addCallback(_check_html)
3365 def test_expire_cutoff_date(self):
3366 basedir = "storage/LeaseCrawler/expire_cutoff_date"
3367 fileutil.make_dirs(basedir)
3368 # setting cutoff-date to 2000 seconds ago means that any lease which
3369 # is more than 2000s old will be expired.
3371 then = int(now - 2000)
3372 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3373 expiration_enabled=True,
3374 expiration_mode="cutoff-date",
3375 expiration_cutoff_date=then)
3376 # make it start sooner than usual.
3377 lc = ss.lease_checker
3379 lc.stop_after_first_bucket = True
3380 webstatus = StorageStatus(ss)
3382 # create a few shares, with some leases on them
3383 self.make_shares(ss)
3384 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3386 def count_shares(si):
3387 return len(list(ss._iter_share_files(si)))
3388 def _get_sharefile(si):
3389 return list(ss._iter_share_files(si))[0]
3390 def count_leases(si):
3391 return len(list(_get_sharefile(si).get_leases()))
3393 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3394 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3395 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3396 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3397 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3398 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3399 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3400 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3402 # artificially crank back the expiration time on the first lease of
3403 # each share, to make it look like was renewed 3000s ago. To achieve
3404 # this, we need to set the expiration time to now-3000+31days. This
3405 # will change when the lease format is improved to contain both
3406 # create/renew time and duration.
3407 new_expiration_time = now - 3000 + 31*24*60*60
3409 # Some shares have an extra lease which is set to expire at the
3410 # default time in 31 days from now (age=31days). We then run the
3411 # crawler, which will expire the first lease, making some shares get
3412 # deleted and others stay alive (with one remaining lease)
3414 sf0 = _get_sharefile(immutable_si_0)
3415 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3416 sf0_size = os.stat(sf0.home).st_size
3418 # immutable_si_1 gets an extra lease
3419 sf1 = _get_sharefile(immutable_si_1)
3420 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3422 sf2 = _get_sharefile(mutable_si_2)
3423 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3424 sf2_size = os.stat(sf2.home).st_size
3426 # mutable_si_3 gets an extra lease
3427 sf3 = _get_sharefile(mutable_si_3)
3428 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3430 ss.setServiceParent(self.s)
3432 d = fireEventually()
3433 # examine the state right after the first bucket has been processed
3434 def _after_first_bucket(ignored):
3435 p = lc.get_progress()
3436 if not p["cycle-in-progress"]:
3437 d2 = fireEventually()
3438 d2.addCallback(_after_first_bucket)
3440 d.addCallback(_after_first_bucket)
3441 d.addCallback(lambda ign: self.render1(webstatus))
3442 def _check_html_in_cycle(html):
3443 s = remove_tags(html)
3444 # the first bucket encountered gets deleted, and its prefix
3445 # happens to be about 1/5th of the way through the ring, so the
3446 # predictor thinks we'll have 5 shares and that we'll delete them
3447 # all. This part of the test depends upon the SIs landing right
3448 # where they do now.
3449 self.failUnlessIn("The remainder of this cycle is expected to "
3450 "recover: 4 shares, 4 buckets", s)
3451 self.failUnlessIn("The whole cycle is expected to examine "
3452 "5 shares in 5 buckets and to recover: "
3453 "5 shares, 5 buckets", s)
3454 d.addCallback(_check_html_in_cycle)
3456 # wait for the crawler to finish the first cycle. Two shares should
3459 return bool(lc.get_state()["last-cycle-finished"] is not None)
3460 d.addCallback(lambda ign: self.poll(_wait))
3462 def _after_first_cycle(ignored):
3463 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3464 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3465 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3466 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3467 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3468 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3471 last = s["history"][0]
3473 self.failUnlessEqual(last["expiration-enabled"], True)
3474 self.failUnlessEqual(last["configured-expiration-mode"],
3475 ("cutoff-date", None, then,
3476 ("mutable", "immutable")))
3477 self.failUnlessEqual(last["leases-per-share-histogram"],
3480 rec = last["space-recovered"]
3481 self.failUnlessEqual(rec["examined-buckets"], 4)
3482 self.failUnlessEqual(rec["examined-shares"], 4)
3483 self.failUnlessEqual(rec["actual-buckets"], 2)
3484 self.failUnlessEqual(rec["original-buckets"], 0)
3485 self.failUnlessEqual(rec["configured-buckets"], 2)
3486 self.failUnlessEqual(rec["actual-shares"], 2)
3487 self.failUnlessEqual(rec["original-shares"], 0)
3488 self.failUnlessEqual(rec["configured-shares"], 2)
3489 size = sf0_size + sf2_size
3490 self.failUnlessEqual(rec["actual-sharebytes"], size)
3491 self.failUnlessEqual(rec["original-sharebytes"], 0)
3492 self.failUnlessEqual(rec["configured-sharebytes"], size)
3493 # different platforms have different notions of "blocks used by
3494 # this file", so merely assert that it's a number
3495 self.failUnless(rec["actual-diskbytes"] >= 0,
3496 rec["actual-diskbytes"])
3497 self.failUnless(rec["original-diskbytes"] >= 0,
3498 rec["original-diskbytes"])
3499 self.failUnless(rec["configured-diskbytes"] >= 0,
3500 rec["configured-diskbytes"])
3501 d.addCallback(_after_first_cycle)
3502 d.addCallback(lambda ign: self.render1(webstatus))
3503 def _check_html(html):
3504 s = remove_tags(html)
3505 self.failUnlessIn("Expiration Enabled:"
3506 " expired leases will be removed", s)
3507 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3508 substr = "Leases created or last renewed before %s will be considered expired." % date
3509 self.failUnlessIn(substr, s)
3510 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3511 d.addCallback(_check_html)
3514 def test_only_immutable(self):
3515 basedir = "storage/LeaseCrawler/only_immutable"
3516 fileutil.make_dirs(basedir)
3518 then = int(now - 2000)
3519 ss = StorageServer(basedir, "\x00" * 20,
3520 expiration_enabled=True,
3521 expiration_mode="cutoff-date",
3522 expiration_cutoff_date=then,
3523 expiration_sharetypes=("immutable",))
3524 lc = ss.lease_checker
3526 webstatus = StorageStatus(ss)
3528 self.make_shares(ss)
3529 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3530 # set all leases to be expirable
3531 new_expiration_time = now - 3000 + 31*24*60*60
3533 def count_shares(si):
3534 return len(list(ss._iter_share_files(si)))
3535 def _get_sharefile(si):
3536 return list(ss._iter_share_files(si))[0]
3537 def count_leases(si):
3538 return len(list(_get_sharefile(si).get_leases()))
3540 sf0 = _get_sharefile(immutable_si_0)
3541 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3542 sf1 = _get_sharefile(immutable_si_1)
3543 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3544 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3545 sf2 = _get_sharefile(mutable_si_2)
3546 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3547 sf3 = _get_sharefile(mutable_si_3)
3548 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3549 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3551 ss.setServiceParent(self.s)
3553 return bool(lc.get_state()["last-cycle-finished"] is not None)
3554 d = self.poll(_wait)
3556 def _after_first_cycle(ignored):
3557 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3558 self.failUnlessEqual(count_shares(immutable_si_1), 0)
3559 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3560 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3561 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3562 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3563 d.addCallback(_after_first_cycle)
3564 d.addCallback(lambda ign: self.render1(webstatus))
3565 def _check_html(html):
3566 s = remove_tags(html)
3567 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3568 d.addCallback(_check_html)
3571 def test_only_mutable(self):
3572 basedir = "storage/LeaseCrawler/only_mutable"
3573 fileutil.make_dirs(basedir)
3575 then = int(now - 2000)
3576 ss = StorageServer(basedir, "\x00" * 20,
3577 expiration_enabled=True,
3578 expiration_mode="cutoff-date",
3579 expiration_cutoff_date=then,
3580 expiration_sharetypes=("mutable",))
3581 lc = ss.lease_checker
3583 webstatus = StorageStatus(ss)
3585 self.make_shares(ss)
3586 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3587 # set all leases to be expirable
3588 new_expiration_time = now - 3000 + 31*24*60*60
3590 def count_shares(si):
3591 return len(list(ss._iter_share_files(si)))
3592 def _get_sharefile(si):
3593 return list(ss._iter_share_files(si))[0]
3594 def count_leases(si):
3595 return len(list(_get_sharefile(si).get_leases()))
3597 sf0 = _get_sharefile(immutable_si_0)
3598 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3599 sf1 = _get_sharefile(immutable_si_1)
3600 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3601 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3602 sf2 = _get_sharefile(mutable_si_2)
3603 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3604 sf3 = _get_sharefile(mutable_si_3)
3605 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3606 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3608 ss.setServiceParent(self.s)
3610 return bool(lc.get_state()["last-cycle-finished"] is not None)
3611 d = self.poll(_wait)
3613 def _after_first_cycle(ignored):
3614 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3615 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3616 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3617 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3618 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3619 self.failUnlessEqual(count_shares(mutable_si_3), 0)
3620 d.addCallback(_after_first_cycle)
3621 d.addCallback(lambda ign: self.render1(webstatus))
3622 def _check_html(html):
3623 s = remove_tags(html)
3624 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3625 d.addCallback(_check_html)
3628 def test_bad_mode(self):
3629 basedir = "storage/LeaseCrawler/bad_mode"
3630 fileutil.make_dirs(basedir)
3631 e = self.failUnlessRaises(ValueError,
3632 StorageServer, basedir, "\x00" * 20,
3633 expiration_mode="bogus")
3634 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3636 def test_parse_duration(self):
3640 p = time_format.parse_duration
3641 self.failUnlessEqual(p("7days"), 7*DAY)
3642 self.failUnlessEqual(p("31day"), 31*DAY)
3643 self.failUnlessEqual(p("60 days"), 60*DAY)
3644 self.failUnlessEqual(p("2mo"), 2*MONTH)
3645 self.failUnlessEqual(p("3 month"), 3*MONTH)
3646 self.failUnlessEqual(p("2years"), 2*YEAR)
3647 e = self.failUnlessRaises(ValueError, p, "2kumquats")
3648 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3650 def test_parse_date(self):
3651 p = time_format.parse_date
3652 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3653 self.failUnlessEqual(p("2009-03-18"), 1237334400)
3655 def test_limited_history(self):
3656 basedir = "storage/LeaseCrawler/limited_history"
3657 fileutil.make_dirs(basedir)
3658 ss = StorageServer(basedir, "\x00" * 20)
3659 # make it start sooner than usual.
3660 lc = ss.lease_checker
3664 # create a few shares, with some leases on them
3665 self.make_shares(ss)
3667 ss.setServiceParent(self.s)
3669 def _wait_until_15_cycles_done():
3670 last = lc.state["last-cycle-finished"]
3671 if last is not None and last >= 15:
3676 d = self.poll(_wait_until_15_cycles_done)
3678 def _check(ignored):
3681 self.failUnlessEqual(len(h), 10)
3682 self.failUnlessEqual(max(h.keys()), 15)
3683 self.failUnlessEqual(min(h.keys()), 6)
3684 d.addCallback(_check)
3687 def test_unpredictable_future(self):
3688 basedir = "storage/LeaseCrawler/unpredictable_future"
3689 fileutil.make_dirs(basedir)
3690 ss = StorageServer(basedir, "\x00" * 20)
3691 # make it start sooner than usual.
3692 lc = ss.lease_checker
3694 lc.cpu_slice = -1.0 # stop quickly
3696 self.make_shares(ss)
3698 ss.setServiceParent(self.s)
3700 d = fireEventually()
3701 def _check(ignored):
3702 # this should fire after the first bucket is complete, but before
3703 # the first prefix is complete, so the progress-measurer won't
3704 # think we've gotten far enough to raise our percent-complete
3705 # above 0%, triggering the cannot-predict-the-future code in
3706 # expirer.py . This will have to change if/when the
3707 # progress-measurer gets smart enough to count buckets (we'll
3708 # have to interrupt it even earlier, before it's finished the
3711 if "cycle-to-date" not in s:
3712 d2 = fireEventually()
3713 d2.addCallback(_check)
3715 self.failUnlessIn("cycle-to-date", s)
3716 self.failUnlessIn("estimated-remaining-cycle", s)
3717 self.failUnlessIn("estimated-current-cycle", s)
3719 left = s["estimated-remaining-cycle"]["space-recovered"]
3720 self.failUnlessEqual(left["actual-buckets"], None)
3721 self.failUnlessEqual(left["original-buckets"], None)
3722 self.failUnlessEqual(left["configured-buckets"], None)
3723 self.failUnlessEqual(left["actual-shares"], None)
3724 self.failUnlessEqual(left["original-shares"], None)
3725 self.failUnlessEqual(left["configured-shares"], None)
3726 self.failUnlessEqual(left["actual-diskbytes"], None)
3727 self.failUnlessEqual(left["original-diskbytes"], None)
3728 self.failUnlessEqual(left["configured-diskbytes"], None)
3729 self.failUnlessEqual(left["actual-sharebytes"], None)
3730 self.failUnlessEqual(left["original-sharebytes"], None)
3731 self.failUnlessEqual(left["configured-sharebytes"], None)
3733 full = s["estimated-remaining-cycle"]["space-recovered"]
3734 self.failUnlessEqual(full["actual-buckets"], None)
3735 self.failUnlessEqual(full["original-buckets"], None)
3736 self.failUnlessEqual(full["configured-buckets"], None)
3737 self.failUnlessEqual(full["actual-shares"], None)
3738 self.failUnlessEqual(full["original-shares"], None)
3739 self.failUnlessEqual(full["configured-shares"], None)
3740 self.failUnlessEqual(full["actual-diskbytes"], None)
3741 self.failUnlessEqual(full["original-diskbytes"], None)
3742 self.failUnlessEqual(full["configured-diskbytes"], None)
3743 self.failUnlessEqual(full["actual-sharebytes"], None)
3744 self.failUnlessEqual(full["original-sharebytes"], None)
3745 self.failUnlessEqual(full["configured-sharebytes"], None)
3747 d.addCallback(_check)
3750 def test_no_st_blocks(self):
3751 basedir = "storage/LeaseCrawler/no_st_blocks"
3752 fileutil.make_dirs(basedir)
3753 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3754 expiration_mode="age",
3755 expiration_override_lease_duration=-1000)
3756 # a negative expiration_time= means the "configured-"
3757 # space-recovered counts will be non-zero, since all shares will have
3760 # make it start sooner than usual.
3761 lc = ss.lease_checker
3764 self.make_shares(ss)
3765 ss.setServiceParent(self.s)
3767 return bool(lc.get_state()["last-cycle-finished"] is not None)
3768 d = self.poll(_wait)
3770 def _check(ignored):
3772 last = s["history"][0]
3773 rec = last["space-recovered"]
3774 self.failUnlessEqual(rec["configured-buckets"], 4)
3775 self.failUnlessEqual(rec["configured-shares"], 4)
3776 self.failUnless(rec["configured-sharebytes"] > 0,
3777 rec["configured-sharebytes"])
3778 # without the .st_blocks field in os.stat() results, we should be
3779 # reporting diskbytes==sharebytes
3780 self.failUnlessEqual(rec["configured-sharebytes"],
3781 rec["configured-diskbytes"])
3782 d.addCallback(_check)
3785 def test_share_corruption(self):
3786 self._poll_should_ignore_these_errors = [
3787 UnknownMutableContainerVersionError,
3788 UnknownImmutableContainerVersionError,
3790 basedir = "storage/LeaseCrawler/share_corruption"
3791 fileutil.make_dirs(basedir)
3792 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3793 w = StorageStatus(ss)
3794 # make it start sooner than usual.
3795 lc = ss.lease_checker
3796 lc.stop_after_first_bucket = True
3800 # create a few shares, with some leases on them
3801 self.make_shares(ss)
3803 # now corrupt one, and make sure the lease-checker keeps going
3804 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3805 first = min(self.sis)
3806 first_b32 = base32.b2a(first)
3807 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3810 f.write("BAD MAGIC")
3812 # if get_share_file() doesn't see the correct mutable magic, it
3813 # assumes the file is an immutable share, and then
3814 # immutable.ShareFile sees a bad version. So regardless of which kind
3815 # of share we corrupted, this will trigger an
3816 # UnknownImmutableContainerVersionError.
3818 # also create an empty bucket
3819 empty_si = base32.b2a("\x04"*16)
3820 empty_bucket_dir = os.path.join(ss.sharedir,
3821 storage_index_to_dir(empty_si))
3822 fileutil.make_dirs(empty_bucket_dir)
3824 ss.setServiceParent(self.s)
3826 d = fireEventually()
3828 # now examine the state right after the first bucket has been
3830 def _after_first_bucket(ignored):
3832 if "cycle-to-date" not in s:
3833 d2 = fireEventually()
3834 d2.addCallback(_after_first_bucket)
3836 so_far = s["cycle-to-date"]
3837 rec = so_far["space-recovered"]
3838 self.failUnlessEqual(rec["examined-buckets"], 1)
3839 self.failUnlessEqual(rec["examined-shares"], 0)
3840 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3841 d.addCallback(_after_first_bucket)
3843 d.addCallback(lambda ign: self.render_json(w))
3844 def _check_json(json):
3845 data = simplejson.loads(json)
3846 # grr. json turns all dict keys into strings.
3847 so_far = data["lease-checker"]["cycle-to-date"]
3848 corrupt_shares = so_far["corrupt-shares"]
3849 # it also turns all tuples into lists
3850 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3851 d.addCallback(_check_json)
3852 d.addCallback(lambda ign: self.render1(w))
3853 def _check_html(html):
3854 s = remove_tags(html)
3855 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3856 d.addCallback(_check_html)
3859 return bool(lc.get_state()["last-cycle-finished"] is not None)
3860 d.addCallback(lambda ign: self.poll(_wait))
3862 def _after_first_cycle(ignored):
3864 last = s["history"][0]
3865 rec = last["space-recovered"]
3866 self.failUnlessEqual(rec["examined-buckets"], 5)
3867 self.failUnlessEqual(rec["examined-shares"], 3)
3868 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3869 d.addCallback(_after_first_cycle)
3870 d.addCallback(lambda ign: self.render_json(w))
3871 def _check_json_history(json):
3872 data = simplejson.loads(json)
3873 last = data["lease-checker"]["history"]["0"]
3874 corrupt_shares = last["corrupt-shares"]
3875 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3876 d.addCallback(_check_json_history)
3877 d.addCallback(lambda ign: self.render1(w))
3878 def _check_html_history(html):
3879 s = remove_tags(html)
3880 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3881 d.addCallback(_check_html_history)
3884 self.flushLoggedErrors(UnknownMutableContainerVersionError,
3885 UnknownImmutableContainerVersionError)
3890 def render_json(self, page):
3891 d = self.render1(page, args={"t": ["json"]})
3894 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3897 self.s = service.MultiService()
3898 self.s.startService()
3900 return self.s.stopService()
3902 def test_no_server(self):
3903 w = StorageStatus(None)
3904 html = w.renderSynchronously()
3905 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3907 def test_status(self):
3908 basedir = "storage/WebStatus/status"
3909 fileutil.make_dirs(basedir)
3910 ss = StorageServer(basedir, "\x00" * 20)
3911 ss.setServiceParent(self.s)
3912 w = StorageStatus(ss)
3914 def _check_html(html):
3915 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3916 s = remove_tags(html)
3917 self.failUnlessIn("Accepting new shares: Yes", s)
3918 self.failUnlessIn("Reserved space: - 0 B (0)", s)
3919 d.addCallback(_check_html)
3920 d.addCallback(lambda ign: self.render_json(w))
3921 def _check_json(json):
3922 data = simplejson.loads(json)
3924 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3925 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3926 self.failUnlessIn("bucket-counter", data)
3927 self.failUnlessIn("lease-checker", data)
3928 d.addCallback(_check_json)
3931 def render_json(self, page):
3932 d = self.render1(page, args={"t": ["json"]})
3935 @mock.patch('allmydata.util.fileutil.get_disk_stats')
3936 def test_status_no_disk_stats(self, mock_get_disk_stats):
3937 mock_get_disk_stats.side_effect = AttributeError()
3939 # Some platforms may have no disk stats API. Make sure the code can handle that
3940 # (test runs on all platforms).
3941 basedir = "storage/WebStatus/status_no_disk_stats"
3942 fileutil.make_dirs(basedir)
3943 ss = StorageServer(basedir, "\x00" * 20)
3944 ss.setServiceParent(self.s)
3945 w = StorageStatus(ss)
3946 html = w.renderSynchronously()
3947 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3948 s = remove_tags(html)
3949 self.failUnlessIn("Accepting new shares: Yes", s)
3950 self.failUnlessIn("Total disk space: ?", s)
3951 self.failUnlessIn("Space Available to Tahoe: ?", s)
3952 self.failUnless(ss.get_available_space() is None)
3954 @mock.patch('allmydata.util.fileutil.get_disk_stats')
3955 def test_status_bad_disk_stats(self, mock_get_disk_stats):
3956 mock_get_disk_stats.side_effect = OSError()
3958 # If the API to get disk stats exists but a call to it fails, then the status should
3959 # show that no shares will be accepted, and get_available_space() should be 0.
3960 basedir = "storage/WebStatus/status_bad_disk_stats"
3961 fileutil.make_dirs(basedir)
3962 ss = StorageServer(basedir, "\x00" * 20)
3963 ss.setServiceParent(self.s)
3964 w = StorageStatus(ss)
3965 html = w.renderSynchronously()
3966 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3967 s = remove_tags(html)
3968 self.failUnlessIn("Accepting new shares: No", s)
3969 self.failUnlessIn("Total disk space: ?", s)
3970 self.failUnlessIn("Space Available to Tahoe: ?", s)
3971 self.failUnlessEqual(ss.get_available_space(), 0)
3973 @mock.patch('allmydata.util.fileutil.get_disk_stats')
3974 def test_status_right_disk_stats(self, mock_get_disk_stats):
3977 free_for_root = 4*GB
3978 free_for_nonroot = 3*GB
3979 reserved_space = 1*GB
3980 used = total - free_for_root
3981 avail = max(free_for_nonroot - reserved_space, 0)
3982 mock_get_disk_stats.return_value = {
3984 'free_for_root': free_for_root,
3985 'free_for_nonroot': free_for_nonroot,
3990 basedir = "storage/WebStatus/status_right_disk_stats"
3991 fileutil.make_dirs(basedir)
3992 ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
3993 expecteddir = ss.sharedir
3994 ss.setServiceParent(self.s)
3995 w = StorageStatus(ss)
3996 html = w.renderSynchronously()
3998 self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
3999 mock_get_disk_stats.call_args_list)
4001 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4002 s = remove_tags(html)
4003 self.failUnlessIn("Total disk space: 5.00 GB", s)
4004 self.failUnlessIn("Disk space used: - 1.00 GB", s)
4005 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4006 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4007 self.failUnlessIn("Reserved space: - 1.00 GB", s)
4008 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4009 self.failUnlessEqual(ss.get_available_space(), 2*GB)
4011 def test_readonly(self):
4012 basedir = "storage/WebStatus/readonly"
4013 fileutil.make_dirs(basedir)
4014 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4015 ss.setServiceParent(self.s)
4016 w = StorageStatus(ss)
4017 html = w.renderSynchronously()
4018 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4019 s = remove_tags(html)
4020 self.failUnlessIn("Accepting new shares: No", s)
4022 def test_reserved(self):
4023 basedir = "storage/WebStatus/reserved"
4024 fileutil.make_dirs(basedir)
4025 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4026 ss.setServiceParent(self.s)
4027 w = StorageStatus(ss)
4028 html = w.renderSynchronously()
4029 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4030 s = remove_tags(html)
4031 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4033 def test_huge_reserved(self):
4034 basedir = "storage/WebStatus/reserved"
4035 fileutil.make_dirs(basedir)
4036 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4037 ss.setServiceParent(self.s)
4038 w = StorageStatus(ss)
4039 html = w.renderSynchronously()
4040 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4041 s = remove_tags(html)
4042 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4044 def test_util(self):
4045 w = StorageStatus(None)
4046 self.failUnlessEqual(w.render_space(None, None), "?")
4047 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4048 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4049 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4050 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4051 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)