1 import time, os.path, platform, stat, re, simplejson, struct, shutil
5 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24 LayoutInvalid, MDMFSIGNABLEHEADER, \
25 SIGNED_PREFIX, MDMFHEADER, \
26 MDMFOFFSETS, SDMFSlotWriteProxy, \
29 VERIFICATION_KEY_SIZE, \
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
40 def __init__(self, ignore_disconnectors=False):
41 self.ignore = ignore_disconnectors
42 self.disconnectors = {}
43 def notifyOnDisconnect(self, f, *args, **kwargs):
47 self.disconnectors[m] = (f, args, kwargs)
49 def dontNotifyOnDisconnect(self, marker):
52 del self.disconnectors[marker]
54 class FakeStatsProvider:
55 def count(self, name, delta=1):
57 def register_producer(self, producer):
60 class Bucket(unittest.TestCase):
61 def make_workdir(self, name):
62 basedir = os.path.join("storage", "Bucket", name)
63 incoming = os.path.join(basedir, "tmp", "bucket")
64 final = os.path.join(basedir, "bucket")
65 fileutil.make_dirs(basedir)
66 fileutil.make_dirs(os.path.join(basedir, "tmp"))
67 return incoming, final
69 def bucket_writer_closed(self, bw, consumed):
71 def add_latency(self, category, latency):
73 def count(self, name, delta=1):
78 renew_secret = os.urandom(32)
79 cancel_secret = os.urandom(32)
80 expiration_time = time.time() + 5000
81 return LeaseInfo(owner_num, renew_secret, cancel_secret,
82 expiration_time, "\x00" * 20)
84 def test_create(self):
85 incoming, final = self.make_workdir("test_create")
86 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88 bw.remote_write(0, "a"*25)
89 bw.remote_write(25, "b"*25)
90 bw.remote_write(50, "c"*25)
91 bw.remote_write(75, "d"*7)
94 def test_readwrite(self):
95 incoming, final = self.make_workdir("test_readwrite")
96 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
98 bw.remote_write(0, "a"*25)
99 bw.remote_write(25, "b"*25)
100 bw.remote_write(50, "c"*7) # last block may be short
104 br = BucketReader(self, bw.finalhome)
105 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
109 def test_read_past_end_of_share_data(self):
110 # test vector for immutable files (hard-coded contents of an immutable share
113 # The following immutable share file content is identical to that
114 # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115 # with share data == 'a'. The total size of this content is 85
118 containerdata = struct.pack('>LLL', 1, 1, 1)
120 # A Tahoe-LAFS storage client would send as the share_data a
121 # complicated string involving hash trees and a URI Extension Block
122 # -- see allmydata/immutable/layout.py . This test, which is
123 # simulating a client, just sends 'a'.
126 ownernumber = struct.pack('>L', 0)
127 renewsecret = 'THIS LETS ME RENEW YOUR FILE....'
128 assert len(renewsecret) == 32
129 cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130 assert len(cancelsecret) == 32
131 expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
133 lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
135 share_file_data = containerdata + share_data + lease_data
137 incoming, final = self.make_workdir("test_read_past_end_of_share_data")
139 fileutil.write(final, share_file_data)
141 mockstorageserver = mock.Mock()
144 br = BucketReader(mockstorageserver, final)
146 self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
148 # Read past the end of share data to get the cancel secret.
149 read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
151 result_of_read = br.remote_read(0, read_length)
152 self.failUnlessEqual(result_of_read, share_data)
154 result_of_read = br.remote_read(0, len(share_data)+1)
155 self.failUnlessEqual(result_of_read, share_data)
163 def callRemote(self, methname, *args, **kwargs):
165 meth = getattr(self.target, "remote_" + methname)
166 return meth(*args, **kwargs)
168 if methname == "slot_readv":
170 if "writev" in methname:
171 self.write_count += 1
173 return defer.maybeDeferred(_call)
176 class BucketProxy(unittest.TestCase):
177 def make_bucket(self, name, size):
178 basedir = os.path.join("storage", "BucketProxy", name)
179 incoming = os.path.join(basedir, "tmp", "bucket")
180 final = os.path.join(basedir, "bucket")
181 fileutil.make_dirs(basedir)
182 fileutil.make_dirs(os.path.join(basedir, "tmp"))
183 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
189 def make_lease(self):
191 renew_secret = os.urandom(32)
192 cancel_secret = os.urandom(32)
193 expiration_time = time.time() + 5000
194 return LeaseInfo(owner_num, renew_secret, cancel_secret,
195 expiration_time, "\x00" * 20)
197 def bucket_writer_closed(self, bw, consumed):
199 def add_latency(self, category, latency):
201 def count(self, name, delta=1):
204 def test_create(self):
205 bw, rb, sharefname = self.make_bucket("test_create", 500)
206 bp = WriteBucketProxy(rb, None,
211 uri_extension_size_max=500)
212 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
214 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215 # Let's pretend each share has 100 bytes of data, and that there are
216 # 4 segments (25 bytes each), and 8 shares total. So the two
217 # per-segment merkle trees (crypttext_hash_tree,
218 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221 # long. That should make the whole share:
223 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
226 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
228 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
230 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
232 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
234 uri_extension = "s" + "E"*498 + "e"
236 bw, rb, sharefname = self.make_bucket(name, sharesize)
237 bp = wbp_class(rb, None,
242 uri_extension_size_max=len(uri_extension))
245 d.addCallback(lambda res: bp.put_block(0, "a"*25))
246 d.addCallback(lambda res: bp.put_block(1, "b"*25))
247 d.addCallback(lambda res: bp.put_block(2, "c"*25))
248 d.addCallback(lambda res: bp.put_block(3, "d"*20))
249 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253 d.addCallback(lambda res: bp.close())
255 # now read everything back
256 def _start_reading(res):
257 br = BucketReader(self, sharefname)
260 server = NoNetworkServer("abc", None)
261 rbp = rbp_class(rb, server, storage_index="")
262 self.failUnlessIn("to peer", repr(rbp))
263 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
265 d1 = rbp.get_block_data(0, 25, 25)
266 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
274 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275 d1.addCallback(lambda res:
276 self.failUnlessEqual(res, crypttext_hashes))
277 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279 d1.addCallback(lambda res: rbp.get_share_hashes())
280 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281 d1.addCallback(lambda res: rbp.get_uri_extension())
282 d1.addCallback(lambda res:
283 self.failUnlessEqual(res, uri_extension))
287 d.addCallback(_start_reading)
291 def test_readwrite_v1(self):
292 return self._do_test_readwrite("test_readwrite_v1",
293 0x24, WriteBucketProxy, ReadBucketProxy)
295 def test_readwrite_v2(self):
296 return self._do_test_readwrite("test_readwrite_v2",
297 0x44, WriteBucketProxy_v2, ReadBucketProxy)
299 class Server(unittest.TestCase):
302 self.sparent = LoggingServiceParent()
303 self.sparent.startService()
304 self._lease_secret = itertools.count()
306 return self.sparent.stopService()
308 def workdir(self, name):
309 basedir = os.path.join("storage", "Server", name)
312 def create(self, name, reserved_space=0, klass=StorageServer):
313 workdir = self.workdir(name)
314 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315 stats_provider=FakeStatsProvider())
316 ss.setServiceParent(self.sparent)
319 def test_create(self):
320 self.create("test_create")
322 def test_declares_fixed_1528(self):
323 ss = self.create("test_declares_fixed_1528")
324 ver = ss.remote_get_version()
325 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326 self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
328 def allocate(self, ss, storage_index, sharenums, size, canary=None):
329 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
330 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
332 canary = FakeCanary()
333 return ss.remote_allocate_buckets(storage_index,
334 renew_secret, cancel_secret,
335 sharenums, size, canary)
337 def test_large_share(self):
338 syslow = platform.system().lower()
339 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
340 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
342 avail = fileutil.get_available_space('.', 512*2**20)
344 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
346 ss = self.create("test_large_share")
348 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
349 self.failUnlessEqual(already, set())
350 self.failUnlessEqual(set(writers.keys()), set([0]))
352 shnum, bucket = writers.items()[0]
353 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
354 bucket.remote_write(2**32, "ab")
355 bucket.remote_close()
357 readers = ss.remote_get_buckets("allocate")
358 reader = readers[shnum]
359 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
361 def test_dont_overfill_dirs(self):
363 This test asserts that if you add a second share whose storage index
364 share lots of leading bits with an extant share (but isn't the exact
365 same storage index), this won't add an entry to the share directory.
367 ss = self.create("test_dont_overfill_dirs")
368 already, writers = self.allocate(ss, "storageindex", [0], 10)
369 for i, wb in writers.items():
370 wb.remote_write(0, "%10d" % i)
372 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
374 children_of_storedir = set(os.listdir(storedir))
376 # Now store another one under another storageindex that has leading
377 # chars the same as the first storageindex.
378 already, writers = self.allocate(ss, "storageindey", [0], 10)
379 for i, wb in writers.items():
380 wb.remote_write(0, "%10d" % i)
382 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
384 new_children_of_storedir = set(os.listdir(storedir))
385 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
387 def test_remove_incoming(self):
388 ss = self.create("test_remove_incoming")
389 already, writers = self.allocate(ss, "vid", range(3), 10)
390 for i,wb in writers.items():
391 wb.remote_write(0, "%10d" % i)
393 incoming_share_dir = wb.incominghome
394 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
395 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
396 incoming_dir = os.path.dirname(incoming_prefix_dir)
397 self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
398 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
399 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
401 def test_abort(self):
402 # remote_abort, when called on a writer, should make sure that
403 # the allocated size of the bucket is not counted by the storage
404 # server when accounting for space.
405 ss = self.create("test_abort")
406 already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
407 self.failIfEqual(ss.allocated_size(), 0)
409 # Now abort the writers.
410 for writer in writers.itervalues():
411 writer.remote_abort()
412 self.failUnlessEqual(ss.allocated_size(), 0)
415 def test_allocate(self):
416 ss = self.create("test_allocate")
418 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
420 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
421 self.failUnlessEqual(already, set())
422 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
424 # while the buckets are open, they should not count as readable
425 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
428 for i,wb in writers.items():
429 wb.remote_write(0, "%25d" % i)
431 # aborting a bucket that was already closed is a no-op
434 # now they should be readable
435 b = ss.remote_get_buckets("allocate")
436 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
437 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
439 self.failUnlessIn("BucketReader", b_str)
440 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
442 # now if we ask about writing again, the server should offer those
443 # three buckets as already present. It should offer them even if we
444 # don't ask about those specific ones.
445 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
446 self.failUnlessEqual(already, set([0,1,2]))
447 self.failUnlessEqual(set(writers.keys()), set([3,4]))
449 # while those two buckets are open for writing, the server should
450 # refuse to offer them to uploaders
452 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
453 self.failUnlessEqual(already2, set([0,1,2]))
454 self.failUnlessEqual(set(writers2.keys()), set([5]))
456 # aborting the writes should remove the tempfiles
457 for i,wb in writers2.items():
459 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
460 self.failUnlessEqual(already2, set([0,1,2]))
461 self.failUnlessEqual(set(writers2.keys()), set([5]))
463 for i,wb in writers2.items():
465 for i,wb in writers.items():
468 def test_bad_container_version(self):
469 ss = self.create("test_bad_container_version")
470 a,w = self.allocate(ss, "si1", [0], 10)
471 w[0].remote_write(0, "\xff"*10)
474 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
477 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
480 ss.remote_get_buckets("allocate")
482 e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
483 ss.remote_get_buckets, "si1")
484 self.failUnlessIn(" had version 0 but we wanted 1", str(e))
486 def test_disconnect(self):
487 # simulate a disconnection
488 ss = self.create("test_disconnect")
489 canary = FakeCanary()
490 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
491 self.failUnlessEqual(already, set())
492 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
493 for (f,args,kwargs) in canary.disconnectors.values():
498 # that ought to delete the incoming shares
499 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
500 self.failUnlessEqual(already, set())
501 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
503 @mock.patch('allmydata.util.fileutil.get_disk_stats')
504 def test_reserved_space(self, mock_get_disk_stats):
506 mock_get_disk_stats.return_value = {
507 'free_for_nonroot': 15000,
508 'avail': max(15000 - reserved_space, 0),
511 ss = self.create("test_reserved_space", reserved_space=reserved_space)
512 # 15k available, 10k reserved, leaves 5k for shares
514 # a newly created and filled share incurs this much overhead, beyond
515 # the size we request.
517 LEASE_SIZE = 4+32+32+4
518 canary = FakeCanary(True)
519 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
520 self.failUnlessEqual(len(writers), 3)
521 # now the StorageServer should have 3000 bytes provisionally
522 # allocated, allowing only 2000 more to be claimed
523 self.failUnlessEqual(len(ss._active_writers), 3)
525 # allocating 1001-byte shares only leaves room for one
526 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
527 self.failUnlessEqual(len(writers2), 1)
528 self.failUnlessEqual(len(ss._active_writers), 4)
530 # we abandon the first set, so their provisional allocation should be
534 self.failUnlessEqual(len(ss._active_writers), 1)
535 # now we have a provisional allocation of 1001 bytes
537 # and we close the second set, so their provisional allocation should
538 # become real, long-term allocation, and grows to include the
540 for bw in writers2.values():
541 bw.remote_write(0, "a"*25)
546 self.failUnlessEqual(len(ss._active_writers), 0)
548 allocated = 1001 + OVERHEAD + LEASE_SIZE
550 # we have to manually increase available, since we're not doing real
552 mock_get_disk_stats.return_value = {
553 'free_for_nonroot': 15000 - allocated,
554 'avail': max(15000 - allocated - reserved_space, 0),
557 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
558 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
559 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
560 self.failUnlessEqual(len(writers3), 39)
561 self.failUnlessEqual(len(ss._active_writers), 39)
565 self.failUnlessEqual(len(ss._active_writers), 0)
566 ss.disownServiceParent()
570 basedir = self.workdir("test_seek_behavior")
571 fileutil.make_dirs(basedir)
572 filename = os.path.join(basedir, "testfile")
573 f = open(filename, "wb")
576 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
577 # files. mode="a" preserves previous contents but does not allow
578 # seeking-to-create-holes. mode="r+" allows both.
579 f = open(filename, "rb+")
583 filelen = os.stat(filename)[stat.ST_SIZE]
584 self.failUnlessEqual(filelen, 100+3)
585 f2 = open(filename, "rb")
586 self.failUnlessEqual(f2.read(5), "start")
589 def test_leases(self):
590 ss = self.create("test_leases")
591 canary = FakeCanary()
595 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
596 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
597 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
598 sharenums, size, canary)
599 self.failUnlessEqual(len(already), 0)
600 self.failUnlessEqual(len(writers), 5)
601 for wb in writers.values():
604 leases = list(ss.get_leases("si0"))
605 self.failUnlessEqual(len(leases), 1)
606 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
608 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
611 sharenums, size, canary)
612 for wb in writers.values():
615 # take out a second lease on si1
616 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
617 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
618 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
619 sharenums, size, canary)
620 self.failUnlessEqual(len(already), 5)
621 self.failUnlessEqual(len(writers), 0)
623 leases = list(ss.get_leases("si1"))
624 self.failUnlessEqual(len(leases), 2)
625 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
627 # and a third lease, using add-lease
628 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
629 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
630 ss.remote_add_lease("si1", rs2a, cs2a)
631 leases = list(ss.get_leases("si1"))
632 self.failUnlessEqual(len(leases), 3)
633 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
635 # add-lease on a missing storage index is silently ignored
636 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
638 # check that si0 is readable
639 readers = ss.remote_get_buckets("si0")
640 self.failUnlessEqual(len(readers), 5)
642 # renew the first lease. Only the proper renew_secret should work
643 ss.remote_renew_lease("si0", rs0)
644 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
645 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
647 # check that si0 is still readable
648 readers = ss.remote_get_buckets("si0")
649 self.failUnlessEqual(len(readers), 5)
651 # There is no such method as remote_cancel_lease for now -- see
653 self.failIf(hasattr(ss, 'remote_cancel_lease'), \
654 "ss should not have a 'remote_cancel_lease' method/attribute")
656 # test overlapping uploads
657 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
658 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
659 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
660 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
661 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
662 sharenums, size, canary)
663 self.failUnlessEqual(len(already), 0)
664 self.failUnlessEqual(len(writers), 5)
665 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
666 sharenums, size, canary)
667 self.failUnlessEqual(len(already2), 0)
668 self.failUnlessEqual(len(writers2), 0)
669 for wb in writers.values():
672 leases = list(ss.get_leases("si3"))
673 self.failUnlessEqual(len(leases), 1)
675 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
676 sharenums, size, canary)
677 self.failUnlessEqual(len(already3), 5)
678 self.failUnlessEqual(len(writers3), 0)
680 leases = list(ss.get_leases("si3"))
681 self.failUnlessEqual(len(leases), 2)
683 def test_readonly(self):
684 workdir = self.workdir("test_readonly")
685 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
686 ss.setServiceParent(self.sparent)
688 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
689 self.failUnlessEqual(already, set())
690 self.failUnlessEqual(writers, {})
692 stats = ss.get_stats()
693 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
694 if "storage_server.disk_avail" in stats:
695 # Some platforms may not have an API to get disk stats.
696 # But if there are stats, readonly_storage means disk_avail=0
697 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
699 def test_discard(self):
700 # discard is really only used for other tests, but we test it anyways
701 workdir = self.workdir("test_discard")
702 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
703 ss.setServiceParent(self.sparent)
705 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
706 self.failUnlessEqual(already, set())
707 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
708 for i,wb in writers.items():
709 wb.remote_write(0, "%25d" % i)
711 # since we discard the data, the shares should be present but sparse.
712 # Since we write with some seeks, the data we read back will be all
714 b = ss.remote_get_buckets("vid")
715 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
716 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
718 def test_advise_corruption(self):
719 workdir = self.workdir("test_advise_corruption")
720 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
721 ss.setServiceParent(self.sparent)
723 si0_s = base32.b2a("si0")
724 ss.remote_advise_corrupt_share("immutable", "si0", 0,
725 "This share smells funny.\n")
726 reportdir = os.path.join(workdir, "corruption-advisories")
727 reports = os.listdir(reportdir)
728 self.failUnlessEqual(len(reports), 1)
729 report_si0 = reports[0]
730 self.failUnlessIn(si0_s, report_si0)
731 f = open(os.path.join(reportdir, report_si0), "r")
734 self.failUnlessIn("type: immutable", report)
735 self.failUnlessIn("storage_index: %s" % si0_s, report)
736 self.failUnlessIn("share_number: 0", report)
737 self.failUnlessIn("This share smells funny.", report)
739 # test the RIBucketWriter version too
740 si1_s = base32.b2a("si1")
741 already,writers = self.allocate(ss, "si1", [1], 75)
742 self.failUnlessEqual(already, set())
743 self.failUnlessEqual(set(writers.keys()), set([1]))
744 writers[1].remote_write(0, "data")
745 writers[1].remote_close()
747 b = ss.remote_get_buckets("si1")
748 self.failUnlessEqual(set(b.keys()), set([1]))
749 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
751 reports = os.listdir(reportdir)
752 self.failUnlessEqual(len(reports), 2)
753 report_si1 = [r for r in reports if si1_s in r][0]
754 f = open(os.path.join(reportdir, report_si1), "r")
757 self.failUnlessIn("type: immutable", report)
758 self.failUnlessIn("storage_index: %s" % si1_s, report)
759 self.failUnlessIn("share_number: 1", report)
760 self.failUnlessIn("This share tastes like dust.", report)
764 class MutableServer(unittest.TestCase):
767 self.sparent = LoggingServiceParent()
768 self._lease_secret = itertools.count()
770 return self.sparent.stopService()
772 def workdir(self, name):
773 basedir = os.path.join("storage", "MutableServer", name)
776 def create(self, name):
777 workdir = self.workdir(name)
778 ss = StorageServer(workdir, "\x00" * 20)
779 ss.setServiceParent(self.sparent)
782 def test_create(self):
783 self.create("test_create")
785 def write_enabler(self, we_tag):
786 return hashutil.tagged_hash("we_blah", we_tag)
788 def renew_secret(self, tag):
789 return hashutil.tagged_hash("renew_blah", str(tag))
791 def cancel_secret(self, tag):
792 return hashutil.tagged_hash("cancel_blah", str(tag))
794 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
795 write_enabler = self.write_enabler(we_tag)
796 renew_secret = self.renew_secret(lease_tag)
797 cancel_secret = self.cancel_secret(lease_tag)
798 rstaraw = ss.remote_slot_testv_and_readv_and_writev
799 testandwritev = dict( [ (shnum, ([], [], None) )
800 for shnum in sharenums ] )
802 rc = rstaraw(storage_index,
803 (write_enabler, renew_secret, cancel_secret),
806 (did_write, readv_data) = rc
807 self.failUnless(did_write)
808 self.failUnless(isinstance(readv_data, dict))
809 self.failUnlessEqual(len(readv_data), 0)
811 def test_bad_magic(self):
812 ss = self.create("test_bad_magic")
813 self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
814 fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
819 read = ss.remote_slot_readv
820 e = self.failUnlessRaises(UnknownMutableContainerVersionError,
821 read, "si1", [0], [(0,10)])
822 self.failUnlessIn(" had magic ", str(e))
823 self.failUnlessIn(" but we wanted ", str(e))
825 def test_container_size(self):
826 ss = self.create("test_container_size")
827 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
829 read = ss.remote_slot_readv
830 rstaraw = ss.remote_slot_testv_and_readv_and_writev
831 secrets = ( self.write_enabler("we1"),
832 self.renew_secret("we1"),
833 self.cancel_secret("we1") )
834 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
835 answer = rstaraw("si1", secrets,
836 {0: ([], [(0,data)], len(data)+12)},
838 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
840 # Trying to make the container too large (by sending a write vector
841 # whose offset is too high) will raise an exception.
842 TOOBIG = MutableShareFile.MAX_SIZE + 10
843 self.failUnlessRaises(DataTooLargeError,
844 rstaraw, "si1", secrets,
845 {0: ([], [(TOOBIG,data)], None)},
848 answer = rstaraw("si1", secrets,
849 {0: ([], [(0,data)], None)},
851 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
853 read_answer = read("si1", [0], [(0,10)])
854 self.failUnlessEqual(read_answer, {0: [data[:10]]})
856 # Sending a new_length shorter than the current length truncates the
858 answer = rstaraw("si1", secrets,
861 read_answer = read("si1", [0], [(0,10)])
862 self.failUnlessEqual(read_answer, {0: [data[:9]]})
864 # Sending a new_length longer than the current length doesn't change
866 answer = rstaraw("si1", secrets,
869 assert answer == (True, {0:[],1:[],2:[]})
870 read_answer = read("si1", [0], [(0, 20)])
871 self.failUnlessEqual(read_answer, {0: [data[:9]]})
873 # Sending a write vector whose start is after the end of the current
874 # data doesn't reveal "whatever was there last time" (palimpsest),
875 # but instead fills with zeroes.
877 # To test this, we fill the data area with a recognizable pattern.
878 pattern = ''.join([chr(i) for i in range(100)])
879 answer = rstaraw("si1", secrets,
880 {0: ([], [(0, pattern)], None)},
882 assert answer == (True, {0:[],1:[],2:[]})
883 # Then truncate the data...
884 answer = rstaraw("si1", secrets,
887 assert answer == (True, {0:[],1:[],2:[]})
888 # Just confirm that you get an empty string if you try to read from
889 # past the (new) endpoint now.
890 answer = rstaraw("si1", secrets,
893 self.failUnlessEqual(answer, (True, {0:[''],1:[''],2:['']}))
895 # Then the extend the file by writing a vector which starts out past
897 answer = rstaraw("si1", secrets,
898 {0: ([], [(50, 'hellothere')], None)},
900 assert answer == (True, {0:[],1:[],2:[]})
901 # Now if you read the stuff between 20 (where we earlier truncated)
902 # and 50, it had better be all zeroes.
903 answer = rstaraw("si1", secrets,
906 self.failUnlessEqual(answer, (True, {0:['\x00'*30],1:[''],2:['']}))
908 # Also see if the server explicitly declares that it supports this
910 ver = ss.remote_get_version()
911 storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
912 self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
914 # If the size is dropped to zero the share is deleted.
915 answer = rstaraw("si1", secrets,
916 {0: ([], [(0,data)], 0)},
918 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
920 read_answer = read("si1", [0], [(0,10)])
921 self.failUnlessEqual(read_answer, {})
923 def test_allocate(self):
924 ss = self.create("test_allocate")
925 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
928 read = ss.remote_slot_readv
929 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
931 self.failUnlessEqual(read("si1", [], [(0, 10)]),
932 {0: [""], 1: [""], 2: [""]})
933 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
937 secrets = ( self.write_enabler("we1"),
938 self.renew_secret("we1"),
939 self.cancel_secret("we1") )
940 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
941 write = ss.remote_slot_testv_and_readv_and_writev
942 answer = write("si1", secrets,
943 {0: ([], [(0,data)], None)},
945 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
947 self.failUnlessEqual(read("si1", [0], [(0,20)]),
948 {0: ["00000000001111111111"]})
949 self.failUnlessEqual(read("si1", [0], [(95,10)]),
951 #self.failUnlessEqual(s0.remote_get_length(), 100)
953 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
954 f = self.failUnlessRaises(BadWriteEnablerError,
955 write, "si1", bad_secrets,
957 self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
959 # this testv should fail
960 answer = write("si1", secrets,
961 {0: ([(0, 12, "eq", "444444444444"),
962 (20, 5, "eq", "22222"),
969 self.failUnlessEqual(answer, (False,
970 {0: ["000000000011", "22222"],
974 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
977 answer = write("si1", secrets,
978 {0: ([(10, 5, "lt", "11111"),
985 self.failUnlessEqual(answer, (False,
990 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
993 def test_operators(self):
994 # test operators, the data we're comparing is '11111' in all cases.
995 # test both fail+pass, reset data after each one.
996 ss = self.create("test_operators")
998 secrets = ( self.write_enabler("we1"),
999 self.renew_secret("we1"),
1000 self.cancel_secret("we1") )
1001 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1002 write = ss.remote_slot_testv_and_readv_and_writev
1003 read = ss.remote_slot_readv
1006 write("si1", secrets,
1007 {0: ([], [(0,data)], None)},
1013 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
1018 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1019 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1020 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
1023 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
1028 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1029 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1032 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1037 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1038 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1042 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1047 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1048 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1051 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1056 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1057 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1060 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1065 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1066 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1070 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1075 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1076 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1079 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1084 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1085 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1089 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1094 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1095 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1098 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1103 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1104 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1108 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1113 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1114 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1117 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1122 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1123 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1126 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1131 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1132 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1136 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1141 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1142 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1145 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1150 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1151 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1154 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1159 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1160 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1163 # finally, test some operators against empty shares
1164 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1169 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1170 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1173 def test_readv(self):
1174 ss = self.create("test_readv")
1175 secrets = ( self.write_enabler("we1"),
1176 self.renew_secret("we1"),
1177 self.cancel_secret("we1") )
1178 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1179 write = ss.remote_slot_testv_and_readv_and_writev
1180 read = ss.remote_slot_readv
1181 data = [("%d" % i) * 100 for i in range(3)]
1182 rc = write("si1", secrets,
1183 {0: ([], [(0,data[0])], None),
1184 1: ([], [(0,data[1])], None),
1185 2: ([], [(0,data[2])], None),
1187 self.failUnlessEqual(rc, (True, {}))
1189 answer = read("si1", [], [(0, 10)])
1190 self.failUnlessEqual(answer, {0: ["0"*10],
1194 def compare_leases_without_timestamps(self, leases_a, leases_b):
1195 self.failUnlessEqual(len(leases_a), len(leases_b))
1196 for i in range(len(leases_a)):
1199 self.failUnlessEqual(a.owner_num, b.owner_num)
1200 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1201 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1202 self.failUnlessEqual(a.nodeid, b.nodeid)
1204 def compare_leases(self, leases_a, leases_b):
1205 self.failUnlessEqual(len(leases_a), len(leases_b))
1206 for i in range(len(leases_a)):
1209 self.failUnlessEqual(a.owner_num, b.owner_num)
1210 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1211 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1212 self.failUnlessEqual(a.nodeid, b.nodeid)
1213 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1215 def test_leases(self):
1216 ss = self.create("test_leases")
1218 return ( self.write_enabler("we1"),
1219 self.renew_secret("we1-%d" % n),
1220 self.cancel_secret("we1-%d" % n) )
1221 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1222 write = ss.remote_slot_testv_and_readv_and_writev
1223 read = ss.remote_slot_readv
1224 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1225 self.failUnlessEqual(rc, (True, {}))
1227 # create a random non-numeric file in the bucket directory, to
1228 # exercise the code that's supposed to ignore those.
1229 bucket_dir = os.path.join(self.workdir("test_leases"),
1230 "shares", storage_index_to_dir("si1"))
1231 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1232 f.write("you ought to be ignoring me\n")
1235 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1236 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1238 # add-lease on a missing storage index is silently ignored
1239 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1241 # re-allocate the slots and use the same secrets, that should update
1243 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1244 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1247 ss.remote_renew_lease("si1", secrets(0)[1])
1248 self.failUnlessEqual(len(list(s0.get_leases())), 1)
1250 # now allocate them with a bunch of different secrets, to trigger the
1251 # extended lease code. Use add_lease for one of them.
1252 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1253 self.failUnlessEqual(len(list(s0.get_leases())), 2)
1254 secrets2 = secrets(2)
1255 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1256 self.failUnlessEqual(len(list(s0.get_leases())), 3)
1257 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1258 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1259 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1261 self.failUnlessEqual(len(list(s0.get_leases())), 6)
1263 all_leases = list(s0.get_leases())
1264 # and write enough data to expand the container, forcing the server
1265 # to move the leases
1266 write("si1", secrets(0),
1267 {0: ([], [(0,data)], 200), },
1270 # read back the leases, make sure they're still intact.
1271 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1273 ss.remote_renew_lease("si1", secrets(0)[1])
1274 ss.remote_renew_lease("si1", secrets(1)[1])
1275 ss.remote_renew_lease("si1", secrets(2)[1])
1276 ss.remote_renew_lease("si1", secrets(3)[1])
1277 ss.remote_renew_lease("si1", secrets(4)[1])
1278 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1279 # get a new copy of the leases, with the current timestamps. Reading
1280 # data and failing to renew/cancel leases should leave the timestamps
1282 all_leases = list(s0.get_leases())
1283 # renewing with a bogus token should prompt an error message
1285 # examine the exception thus raised, make sure the old nodeid is
1286 # present, to provide for share migration
1287 e = self.failUnlessRaises(IndexError,
1288 ss.remote_renew_lease, "si1",
1291 self.failUnlessIn("Unable to renew non-existent lease", e_s)
1292 self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1293 self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1295 self.compare_leases(all_leases, list(s0.get_leases()))
1297 # reading shares should not modify the timestamp
1298 read("si1", [], [(0,200)])
1299 self.compare_leases(all_leases, list(s0.get_leases()))
1301 write("si1", secrets(0),
1302 {0: ([], [(200, "make me bigger")], None)}, [])
1303 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1305 write("si1", secrets(0),
1306 {0: ([], [(500, "make me really bigger")], None)}, [])
1307 self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1309 def test_remove(self):
1310 ss = self.create("test_remove")
1311 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1313 readv = ss.remote_slot_readv
1314 writev = ss.remote_slot_testv_and_readv_and_writev
1315 secrets = ( self.write_enabler("we1"),
1316 self.renew_secret("we1"),
1317 self.cancel_secret("we1") )
1318 # delete sh0 by setting its size to zero
1319 answer = writev("si1", secrets,
1322 # the answer should mention all the shares that existed before the
1324 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1325 # but a new read should show only sh1 and sh2
1326 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1329 # delete sh1 by setting its size to zero
1330 answer = writev("si1", secrets,
1333 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1334 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1337 # delete sh2 by setting its size to zero
1338 answer = writev("si1", secrets,
1341 self.failUnlessEqual(answer, (True, {2:[]}) )
1342 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1344 # and the bucket directory should now be gone
1345 si = base32.b2a("si1")
1346 # note: this is a detail of the storage server implementation, and
1347 # may change in the future
1349 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1350 bucketdir = os.path.join(prefixdir, si)
1351 self.failUnless(os.path.exists(prefixdir), prefixdir)
1352 self.failIf(os.path.exists(bucketdir), bucketdir)
1355 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1357 self.sparent = LoggingServiceParent()
1358 self._lease_secret = itertools.count()
1359 self.ss = self.create("MDMFProxies storage test server")
1360 self.rref = RemoteBucket()
1361 self.rref.target = self.ss
1362 self.secrets = (self.write_enabler("we_secret"),
1363 self.renew_secret("renew_secret"),
1364 self.cancel_secret("cancel_secret"))
1365 self.segment = "aaaaaa"
1367 self.salt = "a" * 16
1368 self.block_hash = "a" * 32
1369 self.block_hash_tree = [self.block_hash for i in xrange(6)]
1370 self.share_hash = self.block_hash
1371 self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1372 self.signature = "foobarbaz"
1373 self.verification_key = "vvvvvv"
1374 self.encprivkey = "private"
1375 self.root_hash = self.block_hash
1376 self.salt_hash = self.root_hash
1377 self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1378 self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1379 self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1380 # blockhashes and salt hashes are serialized in the same way,
1381 # only we lop off the first element and store that in the
1383 self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1387 self.sparent.stopService()
1388 shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1391 def write_enabler(self, we_tag):
1392 return hashutil.tagged_hash("we_blah", we_tag)
1395 def renew_secret(self, tag):
1396 return hashutil.tagged_hash("renew_blah", str(tag))
1399 def cancel_secret(self, tag):
1400 return hashutil.tagged_hash("cancel_blah", str(tag))
1403 def workdir(self, name):
1404 basedir = os.path.join("storage", "MutableServer", name)
1408 def create(self, name):
1409 workdir = self.workdir(name)
1410 ss = StorageServer(workdir, "\x00" * 20)
1411 ss.setServiceParent(self.sparent)
1415 def build_test_mdmf_share(self, tail_segment=False, empty=False):
1416 # Start with the checkstring
1417 data = struct.pack(">BQ32s",
1421 self.checkstring = data
1422 # Next, the encoding parameters
1424 data += struct.pack(">BBQQ",
1430 data += struct.pack(">BBQQ",
1436 data += struct.pack(">BBQQ",
1441 # Now we'll build the offsets.
1443 if not tail_segment and not empty:
1445 sharedata += self.salt + self.block
1448 sharedata += self.salt + self.block
1449 sharedata += self.salt + "a"
1451 # The encrypted private key comes after the shares + salts
1452 offset_size = struct.calcsize(MDMFOFFSETS)
1453 encrypted_private_key_offset = len(data) + offset_size
1454 # The share has chain comes after the private key
1455 sharehashes_offset = encrypted_private_key_offset + \
1456 len(self.encprivkey)
1458 # The signature comes after the share hash chain.
1459 signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1461 verification_key_offset = signature_offset + len(self.signature)
1462 verification_key_end = verification_key_offset + \
1463 len(self.verification_key)
1465 share_data_offset = offset_size
1466 share_data_offset += PRIVATE_KEY_SIZE
1467 share_data_offset += SIGNATURE_SIZE
1468 share_data_offset += VERIFICATION_KEY_SIZE
1469 share_data_offset += SHARE_HASH_CHAIN_SIZE
1471 blockhashes_offset = share_data_offset + len(sharedata)
1472 eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1474 data += struct.pack(MDMFOFFSETS,
1475 encrypted_private_key_offset,
1478 verification_key_offset,
1479 verification_key_end,
1485 self.offsets['enc_privkey'] = encrypted_private_key_offset
1486 self.offsets['block_hash_tree'] = blockhashes_offset
1487 self.offsets['share_hash_chain'] = sharehashes_offset
1488 self.offsets['signature'] = signature_offset
1489 self.offsets['verification_key'] = verification_key_offset
1490 self.offsets['share_data'] = share_data_offset
1491 self.offsets['verification_key_end'] = verification_key_end
1492 self.offsets['EOF'] = eof_offset
1495 data += self.encprivkey
1497 data += self.share_hash_chain_s
1499 data += self.signature
1500 # and the verification key
1501 data += self.verification_key
1502 # Then we'll add in gibberish until we get to the right point.
1503 nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1506 # Then the share data
1509 data += self.block_hash_tree_s
1513 def write_test_share_to_server(self,
1518 I write some data for the read tests to read to self.ss
1520 If tail_segment=True, then I will write a share that has a
1521 smaller tail segment than other segments.
1523 write = self.ss.remote_slot_testv_and_readv_and_writev
1524 data = self.build_test_mdmf_share(tail_segment, empty)
1525 # Finally, we write the whole thing to the storage server in one
1527 testvs = [(0, 1, "eq", "")]
1529 tws[0] = (testvs, [(0, data)], None)
1531 results = write(storage_index, self.secrets, tws, readv)
1532 self.failUnless(results[0])
1535 def build_test_sdmf_share(self, empty=False):
1539 sharedata = self.segment * 6
1540 self.sharedata = sharedata
1541 blocksize = len(sharedata) / 3
1542 block = sharedata[:blocksize]
1543 self.blockdata = block
1544 prefix = struct.pack(">BQ32s16s BBQQ",
1554 post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1555 signature_offset = post_offset + len(self.verification_key)
1556 sharehashes_offset = signature_offset + len(self.signature)
1557 blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1558 sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1559 encprivkey_offset = sharedata_offset + len(block)
1560 eof_offset = encprivkey_offset + len(self.encprivkey)
1561 offsets = struct.pack(">LLLLQQ",
1568 final_share = "".join([prefix,
1570 self.verification_key,
1572 self.share_hash_chain_s,
1573 self.block_hash_tree_s,
1577 self.offsets['signature'] = signature_offset
1578 self.offsets['share_hash_chain'] = sharehashes_offset
1579 self.offsets['block_hash_tree'] = blockhashes_offset
1580 self.offsets['share_data'] = sharedata_offset
1581 self.offsets['enc_privkey'] = encprivkey_offset
1582 self.offsets['EOF'] = eof_offset
1586 def write_sdmf_share_to_server(self,
1589 # Some tests need SDMF shares to verify that we can still
1590 # read them. This method writes one, which resembles but is not
1592 write = self.ss.remote_slot_testv_and_readv_and_writev
1593 share = self.build_test_sdmf_share(empty)
1594 testvs = [(0, 1, "eq", "")]
1596 tws[0] = (testvs, [(0, share)], None)
1598 results = write(storage_index, self.secrets, tws, readv)
1599 self.failUnless(results[0])
1602 def test_read(self):
1603 self.write_test_share_to_server("si1")
1604 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1605 # Check that every method equals what we expect it to.
1606 d = defer.succeed(None)
1607 def _check_block_and_salt((block, salt)):
1608 self.failUnlessEqual(block, self.block)
1609 self.failUnlessEqual(salt, self.salt)
1612 d.addCallback(lambda ignored, i=i:
1613 mr.get_block_and_salt(i))
1614 d.addCallback(_check_block_and_salt)
1616 d.addCallback(lambda ignored:
1617 mr.get_encprivkey())
1618 d.addCallback(lambda encprivkey:
1619 self.failUnlessEqual(self.encprivkey, encprivkey))
1621 d.addCallback(lambda ignored:
1622 mr.get_blockhashes())
1623 d.addCallback(lambda blockhashes:
1624 self.failUnlessEqual(self.block_hash_tree, blockhashes))
1626 d.addCallback(lambda ignored:
1627 mr.get_sharehashes())
1628 d.addCallback(lambda sharehashes:
1629 self.failUnlessEqual(self.share_hash_chain, sharehashes))
1631 d.addCallback(lambda ignored:
1633 d.addCallback(lambda signature:
1634 self.failUnlessEqual(signature, self.signature))
1636 d.addCallback(lambda ignored:
1637 mr.get_verification_key())
1638 d.addCallback(lambda verification_key:
1639 self.failUnlessEqual(verification_key, self.verification_key))
1641 d.addCallback(lambda ignored:
1643 d.addCallback(lambda seqnum:
1644 self.failUnlessEqual(seqnum, 0))
1646 d.addCallback(lambda ignored:
1648 d.addCallback(lambda root_hash:
1649 self.failUnlessEqual(self.root_hash, root_hash))
1651 d.addCallback(lambda ignored:
1653 d.addCallback(lambda seqnum:
1654 self.failUnlessEqual(0, seqnum))
1656 d.addCallback(lambda ignored:
1657 mr.get_encoding_parameters())
1658 def _check_encoding_parameters((k, n, segsize, datalen)):
1659 self.failUnlessEqual(k, 3)
1660 self.failUnlessEqual(n, 10)
1661 self.failUnlessEqual(segsize, 6)
1662 self.failUnlessEqual(datalen, 36)
1663 d.addCallback(_check_encoding_parameters)
1665 d.addCallback(lambda ignored:
1666 mr.get_checkstring())
1667 d.addCallback(lambda checkstring:
1668 self.failUnlessEqual(checkstring, checkstring))
1672 def test_read_with_different_tail_segment_size(self):
1673 self.write_test_share_to_server("si1", tail_segment=True)
1674 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1675 d = mr.get_block_and_salt(5)
1676 def _check_tail_segment(results):
1677 block, salt = results
1678 self.failUnlessEqual(len(block), 1)
1679 self.failUnlessEqual(block, "a")
1680 d.addCallback(_check_tail_segment)
1684 def test_get_block_with_invalid_segnum(self):
1685 self.write_test_share_to_server("si1")
1686 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1687 d = defer.succeed(None)
1688 d.addCallback(lambda ignored:
1689 self.shouldFail(LayoutInvalid, "test invalid segnum",
1691 mr.get_block_and_salt, 7))
1695 def test_get_encoding_parameters_first(self):
1696 self.write_test_share_to_server("si1")
1697 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1698 d = mr.get_encoding_parameters()
1699 def _check_encoding_parameters((k, n, segment_size, datalen)):
1700 self.failUnlessEqual(k, 3)
1701 self.failUnlessEqual(n, 10)
1702 self.failUnlessEqual(segment_size, 6)
1703 self.failUnlessEqual(datalen, 36)
1704 d.addCallback(_check_encoding_parameters)
1708 def test_get_seqnum_first(self):
1709 self.write_test_share_to_server("si1")
1710 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1712 d.addCallback(lambda seqnum:
1713 self.failUnlessEqual(seqnum, 0))
1717 def test_get_root_hash_first(self):
1718 self.write_test_share_to_server("si1")
1719 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1720 d = mr.get_root_hash()
1721 d.addCallback(lambda root_hash:
1722 self.failUnlessEqual(root_hash, self.root_hash))
1726 def test_get_checkstring_first(self):
1727 self.write_test_share_to_server("si1")
1728 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1729 d = mr.get_checkstring()
1730 d.addCallback(lambda checkstring:
1731 self.failUnlessEqual(checkstring, self.checkstring))
1735 def test_write_read_vectors(self):
1736 # When writing for us, the storage server will return to us a
1737 # read vector, along with its result. If a write fails because
1738 # the test vectors failed, this read vector can help us to
1739 # diagnose the problem. This test ensures that the read vector
1740 # is working appropriately.
1741 mw = self._make_new_mw("si1", 0)
1744 mw.put_block(self.block, i, self.salt)
1745 mw.put_encprivkey(self.encprivkey)
1746 mw.put_blockhashes(self.block_hash_tree)
1747 mw.put_sharehashes(self.share_hash_chain)
1748 mw.put_root_hash(self.root_hash)
1749 mw.put_signature(self.signature)
1750 mw.put_verification_key(self.verification_key)
1751 d = mw.finish_publishing()
1753 self.failUnless(len(results), 2)
1754 result, readv = results
1755 self.failUnless(result)
1757 self.old_checkstring = mw.get_checkstring()
1758 mw.set_checkstring("")
1759 d.addCallback(_then)
1760 d.addCallback(lambda ignored:
1761 mw.finish_publishing())
1762 def _then_again(results):
1763 self.failUnlessEqual(len(results), 2)
1764 result, readvs = results
1766 self.failUnlessIn(0, readvs)
1767 readv = readvs[0][0]
1768 self.failUnlessEqual(readv, self.old_checkstring)
1769 d.addCallback(_then_again)
1770 # The checkstring remains the same for the rest of the process.
1774 def test_private_key_after_share_hash_chain(self):
1775 mw = self._make_new_mw("si1", 0)
1776 d = defer.succeed(None)
1778 d.addCallback(lambda ignored, i=i:
1779 mw.put_block(self.block, i, self.salt))
1780 d.addCallback(lambda ignored:
1781 mw.put_encprivkey(self.encprivkey))
1782 d.addCallback(lambda ignored:
1783 mw.put_sharehashes(self.share_hash_chain))
1785 # Now try to put the private key again.
1786 d.addCallback(lambda ignored:
1787 self.shouldFail(LayoutInvalid, "test repeat private key",
1789 mw.put_encprivkey, self.encprivkey))
1793 def test_signature_after_verification_key(self):
1794 mw = self._make_new_mw("si1", 0)
1795 d = defer.succeed(None)
1796 # Put everything up to and including the verification key.
1798 d.addCallback(lambda ignored, i=i:
1799 mw.put_block(self.block, i, self.salt))
1800 d.addCallback(lambda ignored:
1801 mw.put_encprivkey(self.encprivkey))
1802 d.addCallback(lambda ignored:
1803 mw.put_blockhashes(self.block_hash_tree))
1804 d.addCallback(lambda ignored:
1805 mw.put_sharehashes(self.share_hash_chain))
1806 d.addCallback(lambda ignored:
1807 mw.put_root_hash(self.root_hash))
1808 d.addCallback(lambda ignored:
1809 mw.put_signature(self.signature))
1810 d.addCallback(lambda ignored:
1811 mw.put_verification_key(self.verification_key))
1812 # Now try to put the signature again. This should fail
1813 d.addCallback(lambda ignored:
1814 self.shouldFail(LayoutInvalid, "signature after verification",
1816 mw.put_signature, self.signature))
1820 def test_uncoordinated_write(self):
1821 # Make two mutable writers, both pointing to the same storage
1822 # server, both at the same storage index, and try writing to the
1824 mw1 = self._make_new_mw("si1", 0)
1825 mw2 = self._make_new_mw("si1", 0)
1827 def _check_success(results):
1828 result, readvs = results
1829 self.failUnless(result)
1831 def _check_failure(results):
1832 result, readvs = results
1835 def _write_share(mw):
1837 mw.put_block(self.block, i, self.salt)
1838 mw.put_encprivkey(self.encprivkey)
1839 mw.put_blockhashes(self.block_hash_tree)
1840 mw.put_sharehashes(self.share_hash_chain)
1841 mw.put_root_hash(self.root_hash)
1842 mw.put_signature(self.signature)
1843 mw.put_verification_key(self.verification_key)
1844 return mw.finish_publishing()
1845 d = _write_share(mw1)
1846 d.addCallback(_check_success)
1847 d.addCallback(lambda ignored:
1849 d.addCallback(_check_failure)
1853 def test_invalid_salt_size(self):
1854 # Salts need to be 16 bytes in size. Writes that attempt to
1855 # write more or less than this should be rejected.
1856 mw = self._make_new_mw("si1", 0)
1857 invalid_salt = "a" * 17 # 17 bytes
1858 another_invalid_salt = "b" * 15 # 15 bytes
1859 d = defer.succeed(None)
1860 d.addCallback(lambda ignored:
1861 self.shouldFail(LayoutInvalid, "salt too big",
1863 mw.put_block, self.block, 0, invalid_salt))
1864 d.addCallback(lambda ignored:
1865 self.shouldFail(LayoutInvalid, "salt too small",
1867 mw.put_block, self.block, 0,
1868 another_invalid_salt))
1872 def test_write_test_vectors(self):
1873 # If we give the write proxy a bogus test vector at
1874 # any point during the process, it should fail to write when we
1876 def _check_failure(results):
1877 self.failUnlessEqual(len(results), 2)
1881 def _check_success(results):
1882 self.failUnlessEqual(len(results), 2)
1884 self.failUnless(results)
1886 mw = self._make_new_mw("si1", 0)
1887 mw.set_checkstring("this is a lie")
1889 mw.put_block(self.block, i, self.salt)
1890 mw.put_encprivkey(self.encprivkey)
1891 mw.put_blockhashes(self.block_hash_tree)
1892 mw.put_sharehashes(self.share_hash_chain)
1893 mw.put_root_hash(self.root_hash)
1894 mw.put_signature(self.signature)
1895 mw.put_verification_key(self.verification_key)
1896 d = mw.finish_publishing()
1897 d.addCallback(_check_failure)
1898 d.addCallback(lambda ignored:
1899 mw.set_checkstring(""))
1900 d.addCallback(lambda ignored:
1901 mw.finish_publishing())
1902 d.addCallback(_check_success)
1906 def serialize_blockhashes(self, blockhashes):
1907 return "".join(blockhashes)
1910 def serialize_sharehashes(self, sharehashes):
1911 ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1912 for i in sorted(sharehashes.keys())])
1916 def test_write(self):
1917 # This translates to a file with 6 6-byte segments, and with 2-byte
1919 mw = self._make_new_mw("si1", 0)
1920 # Test writing some blocks.
1921 read = self.ss.remote_slot_readv
1922 expected_private_key_offset = struct.calcsize(MDMFHEADER)
1923 expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1924 PRIVATE_KEY_SIZE + \
1926 VERIFICATION_KEY_SIZE + \
1927 SHARE_HASH_CHAIN_SIZE
1928 written_block_size = 2 + len(self.salt)
1929 written_block = self.block + self.salt
1931 mw.put_block(self.block, i, self.salt)
1933 mw.put_encprivkey(self.encprivkey)
1934 mw.put_blockhashes(self.block_hash_tree)
1935 mw.put_sharehashes(self.share_hash_chain)
1936 mw.put_root_hash(self.root_hash)
1937 mw.put_signature(self.signature)
1938 mw.put_verification_key(self.verification_key)
1939 d = mw.finish_publishing()
1940 def _check_publish(results):
1941 self.failUnlessEqual(len(results), 2)
1942 result, ign = results
1943 self.failUnless(result, "publish failed")
1945 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1946 {0: [written_block]})
1948 self.failUnlessEqual(len(self.encprivkey), 7)
1949 self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1950 {0: [self.encprivkey]})
1952 expected_block_hash_offset = expected_sharedata_offset + \
1953 (6 * written_block_size)
1954 self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1955 self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1956 {0: [self.block_hash_tree_s]})
1958 expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1959 self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1960 {0: [self.share_hash_chain_s]})
1962 self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1963 {0: [self.root_hash]})
1964 expected_signature_offset = expected_share_hash_offset + \
1965 len(self.share_hash_chain_s)
1966 self.failUnlessEqual(len(self.signature), 9)
1967 self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1968 {0: [self.signature]})
1970 expected_verification_key_offset = expected_signature_offset + len(self.signature)
1971 self.failUnlessEqual(len(self.verification_key), 6)
1972 self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1973 {0: [self.verification_key]})
1975 signable = mw.get_signable()
1976 verno, seq, roothash, k, n, segsize, datalen = \
1977 struct.unpack(">BQ32sBBQQ",
1979 self.failUnlessEqual(verno, 1)
1980 self.failUnlessEqual(seq, 0)
1981 self.failUnlessEqual(roothash, self.root_hash)
1982 self.failUnlessEqual(k, 3)
1983 self.failUnlessEqual(n, 10)
1984 self.failUnlessEqual(segsize, 6)
1985 self.failUnlessEqual(datalen, 36)
1986 expected_eof_offset = expected_block_hash_offset + \
1987 len(self.block_hash_tree_s)
1989 # Check the version number to make sure that it is correct.
1990 expected_version_number = struct.pack(">B", 1)
1991 self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1992 {0: [expected_version_number]})
1993 # Check the sequence number to make sure that it is correct
1994 expected_sequence_number = struct.pack(">Q", 0)
1995 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
1996 {0: [expected_sequence_number]})
1997 # Check that the encoding parameters (k, N, segement size, data
1998 # length) are what they should be. These are 3, 10, 6, 36
1999 expected_k = struct.pack(">B", 3)
2000 self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2002 expected_n = struct.pack(">B", 10)
2003 self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2005 expected_segment_size = struct.pack(">Q", 6)
2006 self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2007 {0: [expected_segment_size]})
2008 expected_data_length = struct.pack(">Q", 36)
2009 self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2010 {0: [expected_data_length]})
2011 expected_offset = struct.pack(">Q", expected_private_key_offset)
2012 self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2013 {0: [expected_offset]})
2014 expected_offset = struct.pack(">Q", expected_share_hash_offset)
2015 self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2016 {0: [expected_offset]})
2017 expected_offset = struct.pack(">Q", expected_signature_offset)
2018 self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2019 {0: [expected_offset]})
2020 expected_offset = struct.pack(">Q", expected_verification_key_offset)
2021 self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2022 {0: [expected_offset]})
2023 expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2024 self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2025 {0: [expected_offset]})
2026 expected_offset = struct.pack(">Q", expected_sharedata_offset)
2027 self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2028 {0: [expected_offset]})
2029 expected_offset = struct.pack(">Q", expected_block_hash_offset)
2030 self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2031 {0: [expected_offset]})
2032 expected_offset = struct.pack(">Q", expected_eof_offset)
2033 self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2034 {0: [expected_offset]})
2035 d.addCallback(_check_publish)
2038 def _make_new_mw(self, si, share, datalength=36):
2039 # This is a file of size 36 bytes. Since it has a segment
2040 # size of 6, we know that it has 6 byte segments, which will
2041 # be split into blocks of 2 bytes because our FEC k
2043 mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2048 def test_write_rejected_with_too_many_blocks(self):
2049 mw = self._make_new_mw("si0", 0)
2051 # Try writing too many blocks. We should not be able to write
2053 # blocks into each share.
2054 d = defer.succeed(None)
2056 d.addCallback(lambda ignored, i=i:
2057 mw.put_block(self.block, i, self.salt))
2058 d.addCallback(lambda ignored:
2059 self.shouldFail(LayoutInvalid, "too many blocks",
2061 mw.put_block, self.block, 7, self.salt))
2065 def test_write_rejected_with_invalid_salt(self):
2066 # Try writing an invalid salt. Salts are 16 bytes -- any more or
2067 # less should cause an error.
2068 mw = self._make_new_mw("si1", 0)
2069 bad_salt = "a" * 17 # 17 bytes
2070 d = defer.succeed(None)
2071 d.addCallback(lambda ignored:
2072 self.shouldFail(LayoutInvalid, "test_invalid_salt",
2073 None, mw.put_block, self.block, 7, bad_salt))
2077 def test_write_rejected_with_invalid_root_hash(self):
2078 # Try writing an invalid root hash. This should be SHA256d, and
2079 # 32 bytes long as a result.
2080 mw = self._make_new_mw("si2", 0)
2081 # 17 bytes != 32 bytes
2082 invalid_root_hash = "a" * 17
2083 d = defer.succeed(None)
2084 # Before this test can work, we need to put some blocks + salts,
2085 # a block hash tree, and a share hash tree. Otherwise, we'll see
2086 # failures that match what we are looking for, but are caused by
2087 # the constraints imposed on operation ordering.
2089 d.addCallback(lambda ignored, i=i:
2090 mw.put_block(self.block, i, self.salt))
2091 d.addCallback(lambda ignored:
2092 mw.put_encprivkey(self.encprivkey))
2093 d.addCallback(lambda ignored:
2094 mw.put_blockhashes(self.block_hash_tree))
2095 d.addCallback(lambda ignored:
2096 mw.put_sharehashes(self.share_hash_chain))
2097 d.addCallback(lambda ignored:
2098 self.shouldFail(LayoutInvalid, "invalid root hash",
2099 None, mw.put_root_hash, invalid_root_hash))
2103 def test_write_rejected_with_invalid_blocksize(self):
2104 # The blocksize implied by the writer that we get from
2105 # _make_new_mw is 2bytes -- any more or any less than this
2106 # should be cause for failure, unless it is the tail segment, in
2107 # which case it may not be failure.
2109 mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2111 # 1 bytes != 2 bytes
2112 d = defer.succeed(None)
2113 d.addCallback(lambda ignored, invalid_block=invalid_block:
2114 self.shouldFail(LayoutInvalid, "test blocksize too small",
2115 None, mw.put_block, invalid_block, 0,
2117 invalid_block = invalid_block * 3
2118 # 3 bytes != 2 bytes
2119 d.addCallback(lambda ignored:
2120 self.shouldFail(LayoutInvalid, "test blocksize too large",
2122 mw.put_block, invalid_block, 0, self.salt))
2124 d.addCallback(lambda ignored, i=i:
2125 mw.put_block(self.block, i, self.salt))
2126 # Try to put an invalid tail segment
2127 d.addCallback(lambda ignored:
2128 self.shouldFail(LayoutInvalid, "test invalid tail segment",
2130 mw.put_block, self.block, 5, self.salt))
2132 d.addCallback(lambda ignored:
2133 mw.put_block(valid_block, 5, self.salt))
2137 def test_write_enforces_order_constraints(self):
2138 # We require that the MDMFSlotWriteProxy be interacted with in a
2142 # 1: write blocks and salts
2143 # 2: Write the encrypted private key
2144 # 3: Write the block hashes
2145 # 4: Write the share hashes
2146 # 5: Write the root hash and salt hash
2147 # 6: Write the signature and verification key
2148 # 7: Write the file.
2150 # Some of these can be performed out-of-order, and some can't.
2151 # The dependencies that I want to test here are:
2152 # - Private key before block hashes
2153 # - share hashes and block hashes before root hash
2154 # - root hash before signature
2155 # - signature before verification key
2156 mw0 = self._make_new_mw("si0", 0)
2158 d = defer.succeed(None)
2160 d.addCallback(lambda ignored, i=i:
2161 mw0.put_block(self.block, i, self.salt))
2163 # Try to write the share hash chain without writing the
2164 # encrypted private key
2165 d.addCallback(lambda ignored:
2166 self.shouldFail(LayoutInvalid, "share hash chain before "
2169 mw0.put_sharehashes, self.share_hash_chain))
2170 # Write the private key.
2171 d.addCallback(lambda ignored:
2172 mw0.put_encprivkey(self.encprivkey))
2174 # Now write the block hashes and try again
2175 d.addCallback(lambda ignored:
2176 mw0.put_blockhashes(self.block_hash_tree))
2178 # We haven't yet put the root hash on the share, so we shouldn't
2179 # be able to sign it.
2180 d.addCallback(lambda ignored:
2181 self.shouldFail(LayoutInvalid, "signature before root hash",
2182 None, mw0.put_signature, self.signature))
2184 d.addCallback(lambda ignored:
2185 self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2187 # ..and, since that fails, we also shouldn't be able to put the
2189 d.addCallback(lambda ignored:
2190 self.shouldFail(LayoutInvalid, "key before signature",
2191 None, mw0.put_verification_key,
2192 self.verification_key))
2194 # Now write the share hashes.
2195 d.addCallback(lambda ignored:
2196 mw0.put_sharehashes(self.share_hash_chain))
2197 # We should be able to write the root hash now too
2198 d.addCallback(lambda ignored:
2199 mw0.put_root_hash(self.root_hash))
2201 # We should still be unable to put the verification key
2202 d.addCallback(lambda ignored:
2203 self.shouldFail(LayoutInvalid, "key before signature",
2204 None, mw0.put_verification_key,
2205 self.verification_key))
2207 d.addCallback(lambda ignored:
2208 mw0.put_signature(self.signature))
2210 # We shouldn't be able to write the offsets to the remote server
2211 # until the offset table is finished; IOW, until we have written
2212 # the verification key.
2213 d.addCallback(lambda ignored:
2214 self.shouldFail(LayoutInvalid, "offsets before verification key",
2216 mw0.finish_publishing))
2218 d.addCallback(lambda ignored:
2219 mw0.put_verification_key(self.verification_key))
2223 def test_end_to_end(self):
2224 mw = self._make_new_mw("si1", 0)
2225 # Write a share using the mutable writer, and make sure that the
2226 # reader knows how to read everything back to us.
2227 d = defer.succeed(None)
2229 d.addCallback(lambda ignored, i=i:
2230 mw.put_block(self.block, i, self.salt))
2231 d.addCallback(lambda ignored:
2232 mw.put_encprivkey(self.encprivkey))
2233 d.addCallback(lambda ignored:
2234 mw.put_blockhashes(self.block_hash_tree))
2235 d.addCallback(lambda ignored:
2236 mw.put_sharehashes(self.share_hash_chain))
2237 d.addCallback(lambda ignored:
2238 mw.put_root_hash(self.root_hash))
2239 d.addCallback(lambda ignored:
2240 mw.put_signature(self.signature))
2241 d.addCallback(lambda ignored:
2242 mw.put_verification_key(self.verification_key))
2243 d.addCallback(lambda ignored:
2244 mw.finish_publishing())
2246 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2247 def _check_block_and_salt((block, salt)):
2248 self.failUnlessEqual(block, self.block)
2249 self.failUnlessEqual(salt, self.salt)
2252 d.addCallback(lambda ignored, i=i:
2253 mr.get_block_and_salt(i))
2254 d.addCallback(_check_block_and_salt)
2256 d.addCallback(lambda ignored:
2257 mr.get_encprivkey())
2258 d.addCallback(lambda encprivkey:
2259 self.failUnlessEqual(self.encprivkey, encprivkey))
2261 d.addCallback(lambda ignored:
2262 mr.get_blockhashes())
2263 d.addCallback(lambda blockhashes:
2264 self.failUnlessEqual(self.block_hash_tree, blockhashes))
2266 d.addCallback(lambda ignored:
2267 mr.get_sharehashes())
2268 d.addCallback(lambda sharehashes:
2269 self.failUnlessEqual(self.share_hash_chain, sharehashes))
2271 d.addCallback(lambda ignored:
2273 d.addCallback(lambda signature:
2274 self.failUnlessEqual(signature, self.signature))
2276 d.addCallback(lambda ignored:
2277 mr.get_verification_key())
2278 d.addCallback(lambda verification_key:
2279 self.failUnlessEqual(verification_key, self.verification_key))
2281 d.addCallback(lambda ignored:
2283 d.addCallback(lambda seqnum:
2284 self.failUnlessEqual(seqnum, 0))
2286 d.addCallback(lambda ignored:
2288 d.addCallback(lambda root_hash:
2289 self.failUnlessEqual(self.root_hash, root_hash))
2291 d.addCallback(lambda ignored:
2292 mr.get_encoding_parameters())
2293 def _check_encoding_parameters((k, n, segsize, datalen)):
2294 self.failUnlessEqual(k, 3)
2295 self.failUnlessEqual(n, 10)
2296 self.failUnlessEqual(segsize, 6)
2297 self.failUnlessEqual(datalen, 36)
2298 d.addCallback(_check_encoding_parameters)
2300 d.addCallback(lambda ignored:
2301 mr.get_checkstring())
2302 d.addCallback(lambda checkstring:
2303 self.failUnlessEqual(checkstring, mw.get_checkstring()))
2307 def test_is_sdmf(self):
2308 # The MDMFSlotReadProxy should also know how to read SDMF files,
2309 # since it will encounter them on the grid. Callers use the
2310 # is_sdmf method to test this.
2311 self.write_sdmf_share_to_server("si1")
2312 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2314 d.addCallback(lambda issdmf:
2315 self.failUnless(issdmf))
2319 def test_reads_sdmf(self):
2320 # The slot read proxy should, naturally, know how to tell us
2321 # about data in the SDMF format
2322 self.write_sdmf_share_to_server("si1")
2323 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2324 d = defer.succeed(None)
2325 d.addCallback(lambda ignored:
2327 d.addCallback(lambda issdmf:
2328 self.failUnless(issdmf))
2330 # What do we need to read?
2333 d.addCallback(lambda ignored:
2334 mr.get_block_and_salt(0))
2335 def _check_block_and_salt(results):
2336 block, salt = results
2337 # Our original file is 36 bytes long. Then each share is 12
2338 # bytes in size. The share is composed entirely of the
2339 # letter a. self.block contains 2 as, so 6 * self.block is
2340 # what we are looking for.
2341 self.failUnlessEqual(block, self.block * 6)
2342 self.failUnlessEqual(salt, self.salt)
2343 d.addCallback(_check_block_and_salt)
2346 d.addCallback(lambda ignored:
2347 mr.get_blockhashes())
2348 d.addCallback(lambda blockhashes:
2349 self.failUnlessEqual(self.block_hash_tree,
2353 d.addCallback(lambda ignored:
2354 mr.get_sharehashes())
2355 d.addCallback(lambda sharehashes:
2356 self.failUnlessEqual(self.share_hash_chain,
2359 d.addCallback(lambda ignored:
2360 mr.get_encprivkey())
2361 d.addCallback(lambda encprivkey:
2362 self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2363 d.addCallback(lambda ignored:
2364 mr.get_verification_key())
2365 d.addCallback(lambda verification_key:
2366 self.failUnlessEqual(verification_key,
2367 self.verification_key,
2370 d.addCallback(lambda ignored:
2372 d.addCallback(lambda signature:
2373 self.failUnlessEqual(signature, self.signature, signature))
2375 # - The sequence number
2376 d.addCallback(lambda ignored:
2378 d.addCallback(lambda seqnum:
2379 self.failUnlessEqual(seqnum, 0, seqnum))
2382 d.addCallback(lambda ignored:
2384 d.addCallback(lambda root_hash:
2385 self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2389 def test_only_reads_one_segment_sdmf(self):
2390 # SDMF shares have only one segment, so it doesn't make sense to
2391 # read more segments than that. The reader should know this and
2392 # complain if we try to do that.
2393 self.write_sdmf_share_to_server("si1")
2394 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2395 d = defer.succeed(None)
2396 d.addCallback(lambda ignored:
2398 d.addCallback(lambda issdmf:
2399 self.failUnless(issdmf))
2400 d.addCallback(lambda ignored:
2401 self.shouldFail(LayoutInvalid, "test bad segment",
2403 mr.get_block_and_salt, 1))
2407 def test_read_with_prefetched_mdmf_data(self):
2408 # The MDMFSlotReadProxy will prefill certain fields if you pass
2409 # it data that you have already fetched. This is useful for
2410 # cases like the Servermap, which prefetches ~2kb of data while
2411 # finding out which shares are on the remote peer so that it
2412 # doesn't waste round trips.
2413 mdmf_data = self.build_test_mdmf_share()
2414 self.write_test_share_to_server("si1")
2415 def _make_mr(ignored, length):
2416 mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2419 d = defer.succeed(None)
2420 # This should be enough to fill in both the encoding parameters
2421 # and the table of offsets, which will complete the version
2422 # information tuple.
2423 d.addCallback(_make_mr, 123)
2424 d.addCallback(lambda mr:
2426 def _check_verinfo(verinfo):
2427 self.failUnless(verinfo)
2428 self.failUnlessEqual(len(verinfo), 9)
2438 self.failUnlessEqual(seqnum, 0)
2439 self.failUnlessEqual(root_hash, self.root_hash)
2440 self.failUnlessEqual(segsize, 6)
2441 self.failUnlessEqual(datalen, 36)
2442 self.failUnlessEqual(k, 3)
2443 self.failUnlessEqual(n, 10)
2444 expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2452 self.failUnlessEqual(expected_prefix, prefix)
2453 self.failUnlessEqual(self.rref.read_count, 0)
2454 d.addCallback(_check_verinfo)
2455 # This is not enough data to read a block and a share, so the
2456 # wrapper should attempt to read this from the remote server.
2457 d.addCallback(_make_mr, 123)
2458 d.addCallback(lambda mr:
2459 mr.get_block_and_salt(0))
2460 def _check_block_and_salt((block, salt)):
2461 self.failUnlessEqual(block, self.block)
2462 self.failUnlessEqual(salt, self.salt)
2463 self.failUnlessEqual(self.rref.read_count, 1)
2464 # This should be enough data to read one block.
2465 d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2466 d.addCallback(lambda mr:
2467 mr.get_block_and_salt(0))
2468 d.addCallback(_check_block_and_salt)
2472 def test_read_with_prefetched_sdmf_data(self):
2473 sdmf_data = self.build_test_sdmf_share()
2474 self.write_sdmf_share_to_server("si1")
2475 def _make_mr(ignored, length):
2476 mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2479 d = defer.succeed(None)
2480 # This should be enough to get us the encoding parameters,
2481 # offset table, and everything else we need to build a verinfo
2483 d.addCallback(_make_mr, 123)
2484 d.addCallback(lambda mr:
2486 def _check_verinfo(verinfo):
2487 self.failUnless(verinfo)
2488 self.failUnlessEqual(len(verinfo), 9)
2498 self.failUnlessEqual(seqnum, 0)
2499 self.failUnlessEqual(root_hash, self.root_hash)
2500 self.failUnlessEqual(salt, self.salt)
2501 self.failUnlessEqual(segsize, 36)
2502 self.failUnlessEqual(datalen, 36)
2503 self.failUnlessEqual(k, 3)
2504 self.failUnlessEqual(n, 10)
2505 expected_prefix = struct.pack(SIGNED_PREFIX,
2514 self.failUnlessEqual(expected_prefix, prefix)
2515 self.failUnlessEqual(self.rref.read_count, 0)
2516 d.addCallback(_check_verinfo)
2517 # This shouldn't be enough to read any share data.
2518 d.addCallback(_make_mr, 123)
2519 d.addCallback(lambda mr:
2520 mr.get_block_and_salt(0))
2521 def _check_block_and_salt((block, salt)):
2522 self.failUnlessEqual(block, self.block * 6)
2523 self.failUnlessEqual(salt, self.salt)
2524 # TODO: Fix the read routine so that it reads only the data
2525 # that it has cached if it can't read all of it.
2526 self.failUnlessEqual(self.rref.read_count, 2)
2528 # This should be enough to read share data.
2529 d.addCallback(_make_mr, self.offsets['share_data'])
2530 d.addCallback(lambda mr:
2531 mr.get_block_and_salt(0))
2532 d.addCallback(_check_block_and_salt)
2536 def test_read_with_empty_mdmf_file(self):
2537 # Some tests upload a file with no contents to test things
2538 # unrelated to the actual handling of the content of the file.
2539 # The reader should behave intelligently in these cases.
2540 self.write_test_share_to_server("si1", empty=True)
2541 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2542 # We should be able to get the encoding parameters, and they
2543 # should be correct.
2544 d = defer.succeed(None)
2545 d.addCallback(lambda ignored:
2546 mr.get_encoding_parameters())
2547 def _check_encoding_parameters(params):
2548 self.failUnlessEqual(len(params), 4)
2549 k, n, segsize, datalen = params
2550 self.failUnlessEqual(k, 3)
2551 self.failUnlessEqual(n, 10)
2552 self.failUnlessEqual(segsize, 0)
2553 self.failUnlessEqual(datalen, 0)
2554 d.addCallback(_check_encoding_parameters)
2556 # We should not be able to fetch a block, since there are no
2558 d.addCallback(lambda ignored:
2559 self.shouldFail(LayoutInvalid, "get block on empty file",
2561 mr.get_block_and_salt, 0))
2565 def test_read_with_empty_sdmf_file(self):
2566 self.write_sdmf_share_to_server("si1", empty=True)
2567 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2568 # We should be able to get the encoding parameters, and they
2570 d = defer.succeed(None)
2571 d.addCallback(lambda ignored:
2572 mr.get_encoding_parameters())
2573 def _check_encoding_parameters(params):
2574 self.failUnlessEqual(len(params), 4)
2575 k, n, segsize, datalen = params
2576 self.failUnlessEqual(k, 3)
2577 self.failUnlessEqual(n, 10)
2578 self.failUnlessEqual(segsize, 0)
2579 self.failUnlessEqual(datalen, 0)
2580 d.addCallback(_check_encoding_parameters)
2582 # It does not make sense to get a block in this format, so we
2583 # should not be able to.
2584 d.addCallback(lambda ignored:
2585 self.shouldFail(LayoutInvalid, "get block on an empty file",
2587 mr.get_block_and_salt, 0))
2591 def test_verinfo_with_sdmf_file(self):
2592 self.write_sdmf_share_to_server("si1")
2593 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2594 # We should be able to get the version information.
2595 d = defer.succeed(None)
2596 d.addCallback(lambda ignored:
2598 def _check_verinfo(verinfo):
2599 self.failUnless(verinfo)
2600 self.failUnlessEqual(len(verinfo), 9)
2610 self.failUnlessEqual(seqnum, 0)
2611 self.failUnlessEqual(root_hash, self.root_hash)
2612 self.failUnlessEqual(salt, self.salt)
2613 self.failUnlessEqual(segsize, 36)
2614 self.failUnlessEqual(datalen, 36)
2615 self.failUnlessEqual(k, 3)
2616 self.failUnlessEqual(n, 10)
2617 expected_prefix = struct.pack(">BQ32s16s BBQQ",
2626 self.failUnlessEqual(prefix, expected_prefix)
2627 self.failUnlessEqual(offsets, self.offsets)
2628 d.addCallback(_check_verinfo)
2632 def test_verinfo_with_mdmf_file(self):
2633 self.write_test_share_to_server("si1")
2634 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2635 d = defer.succeed(None)
2636 d.addCallback(lambda ignored:
2638 def _check_verinfo(verinfo):
2639 self.failUnless(verinfo)
2640 self.failUnlessEqual(len(verinfo), 9)
2650 self.failUnlessEqual(seqnum, 0)
2651 self.failUnlessEqual(root_hash, self.root_hash)
2653 self.failUnlessEqual(segsize, 6)
2654 self.failUnlessEqual(datalen, 36)
2655 self.failUnlessEqual(k, 3)
2656 self.failUnlessEqual(n, 10)
2657 expected_prefix = struct.pack(">BQ32s BBQQ",
2665 self.failUnlessEqual(prefix, expected_prefix)
2666 self.failUnlessEqual(offsets, self.offsets)
2667 d.addCallback(_check_verinfo)
2671 def test_sdmf_writer(self):
2672 # Go through the motions of writing an SDMF share to the storage
2673 # server. Then read the storage server to see that the share got
2674 # written in the way that we think it should have.
2676 # We do this first so that the necessary instance variables get
2677 # set the way we want them for the tests below.
2678 data = self.build_test_sdmf_share()
2679 sdmfr = SDMFSlotWriteProxy(0,
2684 # Put the block and salt.
2685 sdmfr.put_block(self.blockdata, 0, self.salt)
2687 # Put the encprivkey
2688 sdmfr.put_encprivkey(self.encprivkey)
2690 # Put the block and share hash chains
2691 sdmfr.put_blockhashes(self.block_hash_tree)
2692 sdmfr.put_sharehashes(self.share_hash_chain)
2693 sdmfr.put_root_hash(self.root_hash)
2696 sdmfr.put_signature(self.signature)
2698 # Put the verification key
2699 sdmfr.put_verification_key(self.verification_key)
2701 # Now check to make sure that nothing has been written yet.
2702 self.failUnlessEqual(self.rref.write_count, 0)
2704 # Now finish publishing
2705 d = sdmfr.finish_publishing()
2707 self.failUnlessEqual(self.rref.write_count, 1)
2708 read = self.ss.remote_slot_readv
2709 self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2711 d.addCallback(_then)
2715 def test_sdmf_writer_preexisting_share(self):
2716 data = self.build_test_sdmf_share()
2717 self.write_sdmf_share_to_server("si1")
2719 # Now there is a share on the storage server. To successfully
2720 # write, we need to set the checkstring correctly. When we
2721 # don't, no write should occur.
2722 sdmfw = SDMFSlotWriteProxy(0,
2727 sdmfw.put_block(self.blockdata, 0, self.salt)
2729 # Put the encprivkey
2730 sdmfw.put_encprivkey(self.encprivkey)
2732 # Put the block and share hash chains
2733 sdmfw.put_blockhashes(self.block_hash_tree)
2734 sdmfw.put_sharehashes(self.share_hash_chain)
2737 sdmfw.put_root_hash(self.root_hash)
2740 sdmfw.put_signature(self.signature)
2742 # Put the verification key
2743 sdmfw.put_verification_key(self.verification_key)
2745 # We shouldn't have a checkstring yet
2746 self.failUnlessEqual(sdmfw.get_checkstring(), "")
2748 d = sdmfw.finish_publishing()
2750 self.failIf(results[0])
2751 # this is the correct checkstring
2752 self._expected_checkstring = results[1][0][0]
2753 return self._expected_checkstring
2755 d.addCallback(_then)
2756 d.addCallback(sdmfw.set_checkstring)
2757 d.addCallback(lambda ignored:
2758 sdmfw.get_checkstring())
2759 d.addCallback(lambda checkstring:
2760 self.failUnlessEqual(checkstring, self._expected_checkstring))
2761 d.addCallback(lambda ignored:
2762 sdmfw.finish_publishing())
2763 def _then_again(results):
2764 self.failUnless(results[0])
2765 read = self.ss.remote_slot_readv
2766 self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2767 {0: [struct.pack(">Q", 1)]})
2768 self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2770 d.addCallback(_then_again)
2774 class Stats(unittest.TestCase):
2777 self.sparent = LoggingServiceParent()
2778 self._lease_secret = itertools.count()
2780 return self.sparent.stopService()
2782 def workdir(self, name):
2783 basedir = os.path.join("storage", "Server", name)
2786 def create(self, name):
2787 workdir = self.workdir(name)
2788 ss = StorageServer(workdir, "\x00" * 20)
2789 ss.setServiceParent(self.sparent)
2792 def test_latencies(self):
2793 ss = self.create("test_latencies")
2794 for i in range(10000):
2795 ss.add_latency("allocate", 1.0 * i)
2796 for i in range(1000):
2797 ss.add_latency("renew", 1.0 * i)
2799 ss.add_latency("write", 1.0 * i)
2801 ss.add_latency("cancel", 2.0 * i)
2802 ss.add_latency("get", 5.0)
2804 output = ss.get_latencies()
2806 self.failUnlessEqual(sorted(output.keys()),
2807 sorted(["allocate", "renew", "cancel", "write", "get"]))
2808 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2809 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2810 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2811 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2812 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2813 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2814 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2815 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2816 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2818 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2819 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2820 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
2821 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2822 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2823 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2824 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2825 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2826 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2828 self.failUnlessEqual(len(ss.latencies["write"]), 20)
2829 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2830 self.failUnless(output["write"]["01_0_percentile"] is None, output)
2831 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
2832 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2833 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2834 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2835 self.failUnless(output["write"]["99_0_percentile"] is None, output)
2836 self.failUnless(output["write"]["99_9_percentile"] is None, output)
2838 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2839 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2840 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2841 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
2842 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2843 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2844 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2845 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2846 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2848 self.failUnlessEqual(len(ss.latencies["get"]), 1)
2849 self.failUnless(output["get"]["mean"] is None, output)
2850 self.failUnless(output["get"]["01_0_percentile"] is None, output)
2851 self.failUnless(output["get"]["10_0_percentile"] is None, output)
2852 self.failUnless(output["get"]["50_0_percentile"] is None, output)
2853 self.failUnless(output["get"]["90_0_percentile"] is None, output)
2854 self.failUnless(output["get"]["95_0_percentile"] is None, output)
2855 self.failUnless(output["get"]["99_0_percentile"] is None, output)
2856 self.failUnless(output["get"]["99_9_percentile"] is None, output)
2859 s = re.sub(r'<[^>]*>', ' ', s)
2860 s = re.sub(r'\s+', ' ', s)
2863 class MyBucketCountingCrawler(BucketCountingCrawler):
2864 def finished_prefix(self, cycle, prefix):
2865 BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2867 d = self.hook_ds.pop(0)
2870 class MyStorageServer(StorageServer):
2871 def add_bucket_counter(self):
2872 statefile = os.path.join(self.storedir, "bucket_counter.state")
2873 self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2874 self.bucket_counter.setServiceParent(self)
2876 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2879 self.s = service.MultiService()
2880 self.s.startService()
2882 return self.s.stopService()
2884 def test_bucket_counter(self):
2885 basedir = "storage/BucketCounter/bucket_counter"
2886 fileutil.make_dirs(basedir)
2887 ss = StorageServer(basedir, "\x00" * 20)
2888 # to make sure we capture the bucket-counting-crawler in the middle
2889 # of a cycle, we reach in and reduce its maximum slice time to 0. We
2890 # also make it start sooner than usual.
2891 ss.bucket_counter.slow_start = 0
2892 orig_cpu_slice = ss.bucket_counter.cpu_slice
2893 ss.bucket_counter.cpu_slice = 0
2894 ss.setServiceParent(self.s)
2896 w = StorageStatus(ss)
2898 # this sample is before the crawler has started doing anything
2899 html = w.renderSynchronously()
2900 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2901 s = remove_tags(html)
2902 self.failUnlessIn("Accepting new shares: Yes", s)
2903 self.failUnlessIn("Reserved space: - 0 B (0)", s)
2904 self.failUnlessIn("Total buckets: Not computed yet", s)
2905 self.failUnlessIn("Next crawl in", s)
2907 # give the bucket-counting-crawler one tick to get started. The
2908 # cpu_slice=0 will force it to yield right after it processes the
2911 d = fireEventually()
2912 def _check(ignored):
2913 # are we really right after the first prefix?
2914 state = ss.bucket_counter.get_state()
2915 if state["last-complete-prefix"] is None:
2916 d2 = fireEventually()
2917 d2.addCallback(_check)
2919 self.failUnlessEqual(state["last-complete-prefix"],
2920 ss.bucket_counter.prefixes[0])
2921 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2922 html = w.renderSynchronously()
2923 s = remove_tags(html)
2924 self.failUnlessIn(" Current crawl ", s)
2925 self.failUnlessIn(" (next work in ", s)
2926 d.addCallback(_check)
2928 # now give it enough time to complete a full cycle
2930 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2931 d.addCallback(lambda ignored: self.poll(_watch))
2932 def _check2(ignored):
2933 ss.bucket_counter.cpu_slice = orig_cpu_slice
2934 html = w.renderSynchronously()
2935 s = remove_tags(html)
2936 self.failUnlessIn("Total buckets: 0 (the number of", s)
2937 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2938 d.addCallback(_check2)
2941 def test_bucket_counter_cleanup(self):
2942 basedir = "storage/BucketCounter/bucket_counter_cleanup"
2943 fileutil.make_dirs(basedir)
2944 ss = StorageServer(basedir, "\x00" * 20)
2945 # to make sure we capture the bucket-counting-crawler in the middle
2946 # of a cycle, we reach in and reduce its maximum slice time to 0.
2947 ss.bucket_counter.slow_start = 0
2948 orig_cpu_slice = ss.bucket_counter.cpu_slice
2949 ss.bucket_counter.cpu_slice = 0
2950 ss.setServiceParent(self.s)
2952 d = fireEventually()
2954 def _after_first_prefix(ignored):
2955 state = ss.bucket_counter.state
2956 if state["last-complete-prefix"] is None:
2957 d2 = fireEventually()
2958 d2.addCallback(_after_first_prefix)
2960 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2961 # now sneak in and mess with its state, to make sure it cleans up
2962 # properly at the end of the cycle
2963 self.failUnlessEqual(state["last-complete-prefix"],
2964 ss.bucket_counter.prefixes[0])
2965 state["bucket-counts"][-12] = {}
2966 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2967 ss.bucket_counter.save_state()
2968 d.addCallback(_after_first_prefix)
2970 # now give it enough time to complete a cycle
2972 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2973 d.addCallback(lambda ignored: self.poll(_watch))
2974 def _check2(ignored):
2975 ss.bucket_counter.cpu_slice = orig_cpu_slice
2976 s = ss.bucket_counter.get_state()
2977 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2978 self.failIf("bogusprefix!" in s["storage-index-samples"],
2979 s["storage-index-samples"].keys())
2980 d.addCallback(_check2)
2983 def test_bucket_counter_eta(self):
2984 basedir = "storage/BucketCounter/bucket_counter_eta"
2985 fileutil.make_dirs(basedir)
2986 ss = MyStorageServer(basedir, "\x00" * 20)
2987 ss.bucket_counter.slow_start = 0
2988 # these will be fired inside finished_prefix()
2989 hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2990 w = StorageStatus(ss)
2992 d = defer.Deferred()
2994 def _check_1(ignored):
2995 # no ETA is available yet
2996 html = w.renderSynchronously()
2997 s = remove_tags(html)
2998 self.failUnlessIn("complete (next work", s)
3000 def _check_2(ignored):
3001 # one prefix has finished, so an ETA based upon that elapsed time
3002 # should be available.
3003 html = w.renderSynchronously()
3004 s = remove_tags(html)
3005 self.failUnlessIn("complete (ETA ", s)
3007 def _check_3(ignored):
3008 # two prefixes have finished
3009 html = w.renderSynchronously()
3010 s = remove_tags(html)
3011 self.failUnlessIn("complete (ETA ", s)
3014 hooks[0].addCallback(_check_1).addErrback(d.errback)
3015 hooks[1].addCallback(_check_2).addErrback(d.errback)
3016 hooks[2].addCallback(_check_3).addErrback(d.errback)
3018 ss.setServiceParent(self.s)
3021 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3022 stop_after_first_bucket = False
3023 def process_bucket(self, *args, **kwargs):
3024 LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3025 if self.stop_after_first_bucket:
3026 self.stop_after_first_bucket = False
3027 self.cpu_slice = -1.0
3028 def yielding(self, sleep_time):
3029 if not self.stop_after_first_bucket:
3030 self.cpu_slice = 500
3032 class BrokenStatResults:
3034 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3037 bsr = BrokenStatResults()
3038 for attrname in dir(s):
3039 if attrname.startswith("_"):
3041 if attrname == "st_blocks":
3043 setattr(bsr, attrname, getattr(s, attrname))
3046 class InstrumentedStorageServer(StorageServer):
3047 LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3048 class No_ST_BLOCKS_StorageServer(StorageServer):
3049 LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3051 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3054 self.s = service.MultiService()
3055 self.s.startService()
3057 return self.s.stopService()
3059 def make_shares(self, ss):
3061 return (si, hashutil.tagged_hash("renew", si),
3062 hashutil.tagged_hash("cancel", si))
3063 def make_mutable(si):
3064 return (si, hashutil.tagged_hash("renew", si),
3065 hashutil.tagged_hash("cancel", si),
3066 hashutil.tagged_hash("write-enabler", si))
3067 def make_extra_lease(si, num):
3068 return (hashutil.tagged_hash("renew-%d" % num, si),
3069 hashutil.tagged_hash("cancel-%d" % num, si))
3071 immutable_si_0, rs0, cs0 = make("\x00" * 16)
3072 immutable_si_1, rs1, cs1 = make("\x01" * 16)
3073 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3074 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3075 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3076 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3078 canary = FakeCanary()
3079 # note: 'tahoe debug dump-share' will not handle this file, since the
3080 # inner contents are not a valid CHK share
3081 data = "\xff" * 1000
3083 a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3085 w[0].remote_write(0, data)
3088 a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3090 w[0].remote_write(0, data)
3092 ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3094 writev = ss.remote_slot_testv_and_readv_and_writev
3095 writev(mutable_si_2, (we2, rs2, cs2),
3096 {0: ([], [(0,data)], len(data))}, [])
3097 writev(mutable_si_3, (we3, rs3, cs3),
3098 {0: ([], [(0,data)], len(data))}, [])
3099 ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3101 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3102 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3103 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3105 def test_basic(self):
3106 basedir = "storage/LeaseCrawler/basic"
3107 fileutil.make_dirs(basedir)
3108 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3109 # make it start sooner than usual.
3110 lc = ss.lease_checker
3113 lc.stop_after_first_bucket = True
3114 webstatus = StorageStatus(ss)
3116 # create a few shares, with some leases on them
3117 self.make_shares(ss)
3118 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3120 # add a non-sharefile to exercise another code path
3121 fn = os.path.join(ss.sharedir,
3122 storage_index_to_dir(immutable_si_0),
3125 f.write("I am not a share.\n")
3128 # this is before the crawl has started, so we're not in a cycle yet
3129 initial_state = lc.get_state()
3130 self.failIf(lc.get_progress()["cycle-in-progress"])
3131 self.failIfIn("cycle-to-date", initial_state)
3132 self.failIfIn("estimated-remaining-cycle", initial_state)
3133 self.failIfIn("estimated-current-cycle", initial_state)
3134 self.failUnlessIn("history", initial_state)
3135 self.failUnlessEqual(initial_state["history"], {})
3137 ss.setServiceParent(self.s)
3141 d = fireEventually()
3143 # now examine the state right after the first bucket has been
3145 def _after_first_bucket(ignored):
3146 initial_state = lc.get_state()
3147 if "cycle-to-date" not in initial_state:
3148 d2 = fireEventually()
3149 d2.addCallback(_after_first_bucket)
3151 self.failUnlessIn("cycle-to-date", initial_state)
3152 self.failUnlessIn("estimated-remaining-cycle", initial_state)
3153 self.failUnlessIn("estimated-current-cycle", initial_state)
3154 self.failUnlessIn("history", initial_state)
3155 self.failUnlessEqual(initial_state["history"], {})
3157 so_far = initial_state["cycle-to-date"]
3158 self.failUnlessEqual(so_far["expiration-enabled"], False)
3159 self.failUnlessIn("configured-expiration-mode", so_far)
3160 self.failUnlessIn("lease-age-histogram", so_far)
3161 lah = so_far["lease-age-histogram"]
3162 self.failUnlessEqual(type(lah), list)
3163 self.failUnlessEqual(len(lah), 1)
3164 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3165 self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3166 self.failUnlessEqual(so_far["corrupt-shares"], [])
3167 sr1 = so_far["space-recovered"]
3168 self.failUnlessEqual(sr1["examined-buckets"], 1)
3169 self.failUnlessEqual(sr1["examined-shares"], 1)
3170 self.failUnlessEqual(sr1["actual-shares"], 0)
3171 self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3172 self.failUnlessEqual(sr1["original-sharebytes"], 0)
3173 left = initial_state["estimated-remaining-cycle"]
3174 sr2 = left["space-recovered"]
3175 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3176 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3177 self.failIfEqual(sr2["actual-shares"], None)
3178 self.failIfEqual(sr2["configured-diskbytes"], None)
3179 self.failIfEqual(sr2["original-sharebytes"], None)
3180 d.addCallback(_after_first_bucket)
3181 d.addCallback(lambda ign: self.render1(webstatus))
3182 def _check_html_in_cycle(html):
3183 s = remove_tags(html)
3184 self.failUnlessIn("So far, this cycle has examined "
3185 "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3186 self.failUnlessIn("and has recovered: "
3187 "0 shares, 0 buckets (0 mutable / 0 immutable), "
3188 "0 B (0 B / 0 B)", s)
3189 self.failUnlessIn("If expiration were enabled, "
3190 "we would have recovered: "
3191 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3192 " 0 B (0 B / 0 B) by now", s)
3193 self.failUnlessIn("and the remainder of this cycle "
3194 "would probably recover: "
3195 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3196 " 0 B (0 B / 0 B)", s)
3197 self.failUnlessIn("and the whole cycle would probably recover: "
3198 "0 shares, 0 buckets (0 mutable / 0 immutable),"
3199 " 0 B (0 B / 0 B)", s)
3200 self.failUnlessIn("if we were strictly using each lease's default "
3201 "31-day lease lifetime", s)
3202 self.failUnlessIn("this cycle would be expected to recover: ", s)
3203 d.addCallback(_check_html_in_cycle)
3205 # wait for the crawler to finish the first cycle. Nothing should have
3208 return bool(lc.get_state()["last-cycle-finished"] is not None)
3209 d.addCallback(lambda ign: self.poll(_wait))
3211 def _after_first_cycle(ignored):
3213 self.failIf("cycle-to-date" in s)
3214 self.failIf("estimated-remaining-cycle" in s)
3215 self.failIf("estimated-current-cycle" in s)
3216 last = s["history"][0]
3217 self.failUnlessIn("cycle-start-finish-times", last)
3218 self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3219 self.failUnlessEqual(last["expiration-enabled"], False)
3220 self.failUnlessIn("configured-expiration-mode", last)
3222 self.failUnlessIn("lease-age-histogram", last)
3223 lah = last["lease-age-histogram"]
3224 self.failUnlessEqual(type(lah), list)
3225 self.failUnlessEqual(len(lah), 1)
3226 self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3228 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3229 self.failUnlessEqual(last["corrupt-shares"], [])
3231 rec = last["space-recovered"]
3232 self.failUnlessEqual(rec["examined-buckets"], 4)
3233 self.failUnlessEqual(rec["examined-shares"], 4)
3234 self.failUnlessEqual(rec["actual-buckets"], 0)
3235 self.failUnlessEqual(rec["original-buckets"], 0)
3236 self.failUnlessEqual(rec["configured-buckets"], 0)
3237 self.failUnlessEqual(rec["actual-shares"], 0)
3238 self.failUnlessEqual(rec["original-shares"], 0)
3239 self.failUnlessEqual(rec["configured-shares"], 0)
3240 self.failUnlessEqual(rec["actual-diskbytes"], 0)
3241 self.failUnlessEqual(rec["original-diskbytes"], 0)
3242 self.failUnlessEqual(rec["configured-diskbytes"], 0)
3243 self.failUnlessEqual(rec["actual-sharebytes"], 0)
3244 self.failUnlessEqual(rec["original-sharebytes"], 0)
3245 self.failUnlessEqual(rec["configured-sharebytes"], 0)
3247 def _get_sharefile(si):
3248 return list(ss._iter_share_files(si))[0]
3249 def count_leases(si):
3250 return len(list(_get_sharefile(si).get_leases()))
3251 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3252 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3253 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3254 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3255 d.addCallback(_after_first_cycle)
3256 d.addCallback(lambda ign: self.render1(webstatus))
3257 def _check_html(html):
3258 s = remove_tags(html)
3259 self.failUnlessIn("recovered: 0 shares, 0 buckets "
3260 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3261 self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3262 "(2 mutable / 2 immutable),", s)
3263 self.failUnlessIn("but expiration was not enabled", s)
3264 d.addCallback(_check_html)
3265 d.addCallback(lambda ign: self.render_json(webstatus))
3266 def _check_json(json):
3267 data = simplejson.loads(json)
3268 self.failUnlessIn("lease-checker", data)
3269 self.failUnlessIn("lease-checker-progress", data)
3270 d.addCallback(_check_json)
3273 def backdate_lease(self, sf, renew_secret, new_expire_time):
3274 # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3275 # "renew" a lease with a new_expire_time that is older than what the
3276 # current lease has), so we have to reach inside it.
3277 for i,lease in enumerate(sf.get_leases()):
3278 if lease.renew_secret == renew_secret:
3279 lease.expiration_time = new_expire_time
3280 f = open(sf.home, 'rb+')
3281 sf._write_lease_record(f, i, lease)
3284 raise IndexError("unable to renew non-existent lease")
3286 def test_expire_age(self):
3287 basedir = "storage/LeaseCrawler/expire_age"
3288 fileutil.make_dirs(basedir)
3289 # setting expiration_time to 2000 means that any lease which is more
3290 # than 2000s old will be expired.
3291 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3292 expiration_enabled=True,
3293 expiration_mode="age",
3294 expiration_override_lease_duration=2000)
3295 # make it start sooner than usual.
3296 lc = ss.lease_checker
3298 lc.stop_after_first_bucket = True
3299 webstatus = StorageStatus(ss)
3301 # create a few shares, with some leases on them
3302 self.make_shares(ss)
3303 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3305 def count_shares(si):
3306 return len(list(ss._iter_share_files(si)))
3307 def _get_sharefile(si):
3308 return list(ss._iter_share_files(si))[0]
3309 def count_leases(si):
3310 return len(list(_get_sharefile(si).get_leases()))
3312 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3313 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3314 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3315 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3316 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3317 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3318 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3319 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3321 # artificially crank back the expiration time on the first lease of
3322 # each share, to make it look like it expired already (age=1000s).
3323 # Some shares have an extra lease which is set to expire at the
3324 # default time in 31 days from now (age=31days). We then run the
3325 # crawler, which will expire the first lease, making some shares get
3326 # deleted and others stay alive (with one remaining lease)
3329 sf0 = _get_sharefile(immutable_si_0)
3330 self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3331 sf0_size = os.stat(sf0.home).st_size
3333 # immutable_si_1 gets an extra lease
3334 sf1 = _get_sharefile(immutable_si_1)
3335 self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3337 sf2 = _get_sharefile(mutable_si_2)
3338 self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3339 sf2_size = os.stat(sf2.home).st_size
3341 # mutable_si_3 gets an extra lease
3342 sf3 = _get_sharefile(mutable_si_3)
3343 self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3345 ss.setServiceParent(self.s)
3347 d = fireEventually()
3348 # examine the state right after the first bucket has been processed
3349 def _after_first_bucket(ignored):
3350 p = lc.get_progress()
3351 if not p["cycle-in-progress"]:
3352 d2 = fireEventually()
3353 d2.addCallback(_after_first_bucket)
3355 d.addCallback(_after_first_bucket)
3356 d.addCallback(lambda ign: self.render1(webstatus))
3357 def _check_html_in_cycle(html):
3358 s = remove_tags(html)
3359 # the first bucket encountered gets deleted, and its prefix
3360 # happens to be about 1/5th of the way through the ring, so the
3361 # predictor thinks we'll have 5 shares and that we'll delete them
3362 # all. This part of the test depends upon the SIs landing right
3363 # where they do now.
3364 self.failUnlessIn("The remainder of this cycle is expected to "
3365 "recover: 4 shares, 4 buckets", s)
3366 self.failUnlessIn("The whole cycle is expected to examine "
3367 "5 shares in 5 buckets and to recover: "
3368 "5 shares, 5 buckets", s)
3369 d.addCallback(_check_html_in_cycle)
3371 # wait for the crawler to finish the first cycle. Two shares should
3374 return bool(lc.get_state()["last-cycle-finished"] is not None)
3375 d.addCallback(lambda ign: self.poll(_wait))
3377 def _after_first_cycle(ignored):
3378 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3379 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3380 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3381 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3382 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3383 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3386 last = s["history"][0]
3388 self.failUnlessEqual(last["expiration-enabled"], True)
3389 self.failUnlessEqual(last["configured-expiration-mode"],
3390 ("age", 2000, None, ("mutable", "immutable")))
3391 self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3393 rec = last["space-recovered"]
3394 self.failUnlessEqual(rec["examined-buckets"], 4)
3395 self.failUnlessEqual(rec["examined-shares"], 4)
3396 self.failUnlessEqual(rec["actual-buckets"], 2)
3397 self.failUnlessEqual(rec["original-buckets"], 2)
3398 self.failUnlessEqual(rec["configured-buckets"], 2)
3399 self.failUnlessEqual(rec["actual-shares"], 2)
3400 self.failUnlessEqual(rec["original-shares"], 2)
3401 self.failUnlessEqual(rec["configured-shares"], 2)
3402 size = sf0_size + sf2_size
3403 self.failUnlessEqual(rec["actual-sharebytes"], size)
3404 self.failUnlessEqual(rec["original-sharebytes"], size)
3405 self.failUnlessEqual(rec["configured-sharebytes"], size)
3406 # different platforms have different notions of "blocks used by
3407 # this file", so merely assert that it's a number
3408 self.failUnless(rec["actual-diskbytes"] >= 0,
3409 rec["actual-diskbytes"])
3410 self.failUnless(rec["original-diskbytes"] >= 0,
3411 rec["original-diskbytes"])
3412 self.failUnless(rec["configured-diskbytes"] >= 0,
3413 rec["configured-diskbytes"])
3414 d.addCallback(_after_first_cycle)
3415 d.addCallback(lambda ign: self.render1(webstatus))
3416 def _check_html(html):
3417 s = remove_tags(html)
3418 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3419 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3420 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3421 d.addCallback(_check_html)
3424 def test_expire_cutoff_date(self):
3425 basedir = "storage/LeaseCrawler/expire_cutoff_date"
3426 fileutil.make_dirs(basedir)
3427 # setting cutoff-date to 2000 seconds ago means that any lease which
3428 # is more than 2000s old will be expired.
3430 then = int(now - 2000)
3431 ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3432 expiration_enabled=True,
3433 expiration_mode="cutoff-date",
3434 expiration_cutoff_date=then)
3435 # make it start sooner than usual.
3436 lc = ss.lease_checker
3438 lc.stop_after_first_bucket = True
3439 webstatus = StorageStatus(ss)
3441 # create a few shares, with some leases on them
3442 self.make_shares(ss)
3443 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3445 def count_shares(si):
3446 return len(list(ss._iter_share_files(si)))
3447 def _get_sharefile(si):
3448 return list(ss._iter_share_files(si))[0]
3449 def count_leases(si):
3450 return len(list(_get_sharefile(si).get_leases()))
3452 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3453 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3454 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3455 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3456 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3457 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3458 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3459 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3461 # artificially crank back the expiration time on the first lease of
3462 # each share, to make it look like was renewed 3000s ago. To achieve
3463 # this, we need to set the expiration time to now-3000+31days. This
3464 # will change when the lease format is improved to contain both
3465 # create/renew time and duration.
3466 new_expiration_time = now - 3000 + 31*24*60*60
3468 # Some shares have an extra lease which is set to expire at the
3469 # default time in 31 days from now (age=31days). We then run the
3470 # crawler, which will expire the first lease, making some shares get
3471 # deleted and others stay alive (with one remaining lease)
3473 sf0 = _get_sharefile(immutable_si_0)
3474 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3475 sf0_size = os.stat(sf0.home).st_size
3477 # immutable_si_1 gets an extra lease
3478 sf1 = _get_sharefile(immutable_si_1)
3479 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3481 sf2 = _get_sharefile(mutable_si_2)
3482 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3483 sf2_size = os.stat(sf2.home).st_size
3485 # mutable_si_3 gets an extra lease
3486 sf3 = _get_sharefile(mutable_si_3)
3487 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3489 ss.setServiceParent(self.s)
3491 d = fireEventually()
3492 # examine the state right after the first bucket has been processed
3493 def _after_first_bucket(ignored):
3494 p = lc.get_progress()
3495 if not p["cycle-in-progress"]:
3496 d2 = fireEventually()
3497 d2.addCallback(_after_first_bucket)
3499 d.addCallback(_after_first_bucket)
3500 d.addCallback(lambda ign: self.render1(webstatus))
3501 def _check_html_in_cycle(html):
3502 s = remove_tags(html)
3503 # the first bucket encountered gets deleted, and its prefix
3504 # happens to be about 1/5th of the way through the ring, so the
3505 # predictor thinks we'll have 5 shares and that we'll delete them
3506 # all. This part of the test depends upon the SIs landing right
3507 # where they do now.
3508 self.failUnlessIn("The remainder of this cycle is expected to "
3509 "recover: 4 shares, 4 buckets", s)
3510 self.failUnlessIn("The whole cycle is expected to examine "
3511 "5 shares in 5 buckets and to recover: "
3512 "5 shares, 5 buckets", s)
3513 d.addCallback(_check_html_in_cycle)
3515 # wait for the crawler to finish the first cycle. Two shares should
3518 return bool(lc.get_state()["last-cycle-finished"] is not None)
3519 d.addCallback(lambda ign: self.poll(_wait))
3521 def _after_first_cycle(ignored):
3522 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3523 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3524 self.failUnlessEqual(count_leases(immutable_si_1), 1)
3525 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3526 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3527 self.failUnlessEqual(count_leases(mutable_si_3), 1)
3530 last = s["history"][0]
3532 self.failUnlessEqual(last["expiration-enabled"], True)
3533 self.failUnlessEqual(last["configured-expiration-mode"],
3534 ("cutoff-date", None, then,
3535 ("mutable", "immutable")))
3536 self.failUnlessEqual(last["leases-per-share-histogram"],
3539 rec = last["space-recovered"]
3540 self.failUnlessEqual(rec["examined-buckets"], 4)
3541 self.failUnlessEqual(rec["examined-shares"], 4)
3542 self.failUnlessEqual(rec["actual-buckets"], 2)
3543 self.failUnlessEqual(rec["original-buckets"], 0)
3544 self.failUnlessEqual(rec["configured-buckets"], 2)
3545 self.failUnlessEqual(rec["actual-shares"], 2)
3546 self.failUnlessEqual(rec["original-shares"], 0)
3547 self.failUnlessEqual(rec["configured-shares"], 2)
3548 size = sf0_size + sf2_size
3549 self.failUnlessEqual(rec["actual-sharebytes"], size)
3550 self.failUnlessEqual(rec["original-sharebytes"], 0)
3551 self.failUnlessEqual(rec["configured-sharebytes"], size)
3552 # different platforms have different notions of "blocks used by
3553 # this file", so merely assert that it's a number
3554 self.failUnless(rec["actual-diskbytes"] >= 0,
3555 rec["actual-diskbytes"])
3556 self.failUnless(rec["original-diskbytes"] >= 0,
3557 rec["original-diskbytes"])
3558 self.failUnless(rec["configured-diskbytes"] >= 0,
3559 rec["configured-diskbytes"])
3560 d.addCallback(_after_first_cycle)
3561 d.addCallback(lambda ign: self.render1(webstatus))
3562 def _check_html(html):
3563 s = remove_tags(html)
3564 self.failUnlessIn("Expiration Enabled:"
3565 " expired leases will be removed", s)
3566 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3567 substr = "Leases created or last renewed before %s will be considered expired." % date
3568 self.failUnlessIn(substr, s)
3569 self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3570 d.addCallback(_check_html)
3573 def test_only_immutable(self):
3574 basedir = "storage/LeaseCrawler/only_immutable"
3575 fileutil.make_dirs(basedir)
3577 then = int(now - 2000)
3578 ss = StorageServer(basedir, "\x00" * 20,
3579 expiration_enabled=True,
3580 expiration_mode="cutoff-date",
3581 expiration_cutoff_date=then,
3582 expiration_sharetypes=("immutable",))
3583 lc = ss.lease_checker
3585 webstatus = StorageStatus(ss)
3587 self.make_shares(ss)
3588 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3589 # set all leases to be expirable
3590 new_expiration_time = now - 3000 + 31*24*60*60
3592 def count_shares(si):
3593 return len(list(ss._iter_share_files(si)))
3594 def _get_sharefile(si):
3595 return list(ss._iter_share_files(si))[0]
3596 def count_leases(si):
3597 return len(list(_get_sharefile(si).get_leases()))
3599 sf0 = _get_sharefile(immutable_si_0)
3600 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3601 sf1 = _get_sharefile(immutable_si_1)
3602 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3603 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3604 sf2 = _get_sharefile(mutable_si_2)
3605 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3606 sf3 = _get_sharefile(mutable_si_3)
3607 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3608 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3610 ss.setServiceParent(self.s)
3612 return bool(lc.get_state()["last-cycle-finished"] is not None)
3613 d = self.poll(_wait)
3615 def _after_first_cycle(ignored):
3616 self.failUnlessEqual(count_shares(immutable_si_0), 0)
3617 self.failUnlessEqual(count_shares(immutable_si_1), 0)
3618 self.failUnlessEqual(count_shares(mutable_si_2), 1)
3619 self.failUnlessEqual(count_leases(mutable_si_2), 1)
3620 self.failUnlessEqual(count_shares(mutable_si_3), 1)
3621 self.failUnlessEqual(count_leases(mutable_si_3), 2)
3622 d.addCallback(_after_first_cycle)
3623 d.addCallback(lambda ign: self.render1(webstatus))
3624 def _check_html(html):
3625 s = remove_tags(html)
3626 self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3627 d.addCallback(_check_html)
3630 def test_only_mutable(self):
3631 basedir = "storage/LeaseCrawler/only_mutable"
3632 fileutil.make_dirs(basedir)
3634 then = int(now - 2000)
3635 ss = StorageServer(basedir, "\x00" * 20,
3636 expiration_enabled=True,
3637 expiration_mode="cutoff-date",
3638 expiration_cutoff_date=then,
3639 expiration_sharetypes=("mutable",))
3640 lc = ss.lease_checker
3642 webstatus = StorageStatus(ss)
3644 self.make_shares(ss)
3645 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3646 # set all leases to be expirable
3647 new_expiration_time = now - 3000 + 31*24*60*60
3649 def count_shares(si):
3650 return len(list(ss._iter_share_files(si)))
3651 def _get_sharefile(si):
3652 return list(ss._iter_share_files(si))[0]
3653 def count_leases(si):
3654 return len(list(_get_sharefile(si).get_leases()))
3656 sf0 = _get_sharefile(immutable_si_0)
3657 self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3658 sf1 = _get_sharefile(immutable_si_1)
3659 self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3660 self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3661 sf2 = _get_sharefile(mutable_si_2)
3662 self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3663 sf3 = _get_sharefile(mutable_si_3)
3664 self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3665 self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3667 ss.setServiceParent(self.s)
3669 return bool(lc.get_state()["last-cycle-finished"] is not None)
3670 d = self.poll(_wait)
3672 def _after_first_cycle(ignored):
3673 self.failUnlessEqual(count_shares(immutable_si_0), 1)
3674 self.failUnlessEqual(count_leases(immutable_si_0), 1)
3675 self.failUnlessEqual(count_shares(immutable_si_1), 1)
3676 self.failUnlessEqual(count_leases(immutable_si_1), 2)
3677 self.failUnlessEqual(count_shares(mutable_si_2), 0)
3678 self.failUnlessEqual(count_shares(mutable_si_3), 0)
3679 d.addCallback(_after_first_cycle)
3680 d.addCallback(lambda ign: self.render1(webstatus))
3681 def _check_html(html):
3682 s = remove_tags(html)
3683 self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3684 d.addCallback(_check_html)
3687 def test_bad_mode(self):
3688 basedir = "storage/LeaseCrawler/bad_mode"
3689 fileutil.make_dirs(basedir)
3690 e = self.failUnlessRaises(ValueError,
3691 StorageServer, basedir, "\x00" * 20,
3692 expiration_mode="bogus")
3693 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3695 def test_parse_duration(self):
3699 p = time_format.parse_duration
3700 self.failUnlessEqual(p("7days"), 7*DAY)
3701 self.failUnlessEqual(p("31day"), 31*DAY)
3702 self.failUnlessEqual(p("60 days"), 60*DAY)
3703 self.failUnlessEqual(p("2mo"), 2*MONTH)
3704 self.failUnlessEqual(p("3 month"), 3*MONTH)
3705 self.failUnlessEqual(p("2years"), 2*YEAR)
3706 e = self.failUnlessRaises(ValueError, p, "2kumquats")
3707 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3709 def test_parse_date(self):
3710 p = time_format.parse_date
3711 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3712 self.failUnlessEqual(p("2009-03-18"), 1237334400)
3714 def test_limited_history(self):
3715 basedir = "storage/LeaseCrawler/limited_history"
3716 fileutil.make_dirs(basedir)
3717 ss = StorageServer(basedir, "\x00" * 20)
3718 # make it start sooner than usual.
3719 lc = ss.lease_checker
3723 # create a few shares, with some leases on them
3724 self.make_shares(ss)
3726 ss.setServiceParent(self.s)
3728 def _wait_until_15_cycles_done():
3729 last = lc.state["last-cycle-finished"]
3730 if last is not None and last >= 15:
3735 d = self.poll(_wait_until_15_cycles_done)
3737 def _check(ignored):
3740 self.failUnlessEqual(len(h), 10)
3741 self.failUnlessEqual(max(h.keys()), 15)
3742 self.failUnlessEqual(min(h.keys()), 6)
3743 d.addCallback(_check)
3746 def test_unpredictable_future(self):
3747 basedir = "storage/LeaseCrawler/unpredictable_future"
3748 fileutil.make_dirs(basedir)
3749 ss = StorageServer(basedir, "\x00" * 20)
3750 # make it start sooner than usual.
3751 lc = ss.lease_checker
3753 lc.cpu_slice = -1.0 # stop quickly
3755 self.make_shares(ss)
3757 ss.setServiceParent(self.s)
3759 d = fireEventually()
3760 def _check(ignored):
3761 # this should fire after the first bucket is complete, but before
3762 # the first prefix is complete, so the progress-measurer won't
3763 # think we've gotten far enough to raise our percent-complete
3764 # above 0%, triggering the cannot-predict-the-future code in
3765 # expirer.py . This will have to change if/when the
3766 # progress-measurer gets smart enough to count buckets (we'll
3767 # have to interrupt it even earlier, before it's finished the
3770 if "cycle-to-date" not in s:
3771 d2 = fireEventually()
3772 d2.addCallback(_check)
3774 self.failUnlessIn("cycle-to-date", s)
3775 self.failUnlessIn("estimated-remaining-cycle", s)
3776 self.failUnlessIn("estimated-current-cycle", s)
3778 left = s["estimated-remaining-cycle"]["space-recovered"]
3779 self.failUnlessEqual(left["actual-buckets"], None)
3780 self.failUnlessEqual(left["original-buckets"], None)
3781 self.failUnlessEqual(left["configured-buckets"], None)
3782 self.failUnlessEqual(left["actual-shares"], None)
3783 self.failUnlessEqual(left["original-shares"], None)
3784 self.failUnlessEqual(left["configured-shares"], None)
3785 self.failUnlessEqual(left["actual-diskbytes"], None)
3786 self.failUnlessEqual(left["original-diskbytes"], None)
3787 self.failUnlessEqual(left["configured-diskbytes"], None)
3788 self.failUnlessEqual(left["actual-sharebytes"], None)
3789 self.failUnlessEqual(left["original-sharebytes"], None)
3790 self.failUnlessEqual(left["configured-sharebytes"], None)
3792 full = s["estimated-remaining-cycle"]["space-recovered"]
3793 self.failUnlessEqual(full["actual-buckets"], None)
3794 self.failUnlessEqual(full["original-buckets"], None)
3795 self.failUnlessEqual(full["configured-buckets"], None)
3796 self.failUnlessEqual(full["actual-shares"], None)
3797 self.failUnlessEqual(full["original-shares"], None)
3798 self.failUnlessEqual(full["configured-shares"], None)
3799 self.failUnlessEqual(full["actual-diskbytes"], None)
3800 self.failUnlessEqual(full["original-diskbytes"], None)
3801 self.failUnlessEqual(full["configured-diskbytes"], None)
3802 self.failUnlessEqual(full["actual-sharebytes"], None)
3803 self.failUnlessEqual(full["original-sharebytes"], None)
3804 self.failUnlessEqual(full["configured-sharebytes"], None)
3806 d.addCallback(_check)
3809 def test_no_st_blocks(self):
3810 basedir = "storage/LeaseCrawler/no_st_blocks"
3811 fileutil.make_dirs(basedir)
3812 ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3813 expiration_mode="age",
3814 expiration_override_lease_duration=-1000)
3815 # a negative expiration_time= means the "configured-"
3816 # space-recovered counts will be non-zero, since all shares will have
3819 # make it start sooner than usual.
3820 lc = ss.lease_checker
3823 self.make_shares(ss)
3824 ss.setServiceParent(self.s)
3826 return bool(lc.get_state()["last-cycle-finished"] is not None)
3827 d = self.poll(_wait)
3829 def _check(ignored):
3831 last = s["history"][0]
3832 rec = last["space-recovered"]
3833 self.failUnlessEqual(rec["configured-buckets"], 4)
3834 self.failUnlessEqual(rec["configured-shares"], 4)
3835 self.failUnless(rec["configured-sharebytes"] > 0,
3836 rec["configured-sharebytes"])
3837 # without the .st_blocks field in os.stat() results, we should be
3838 # reporting diskbytes==sharebytes
3839 self.failUnlessEqual(rec["configured-sharebytes"],
3840 rec["configured-diskbytes"])
3841 d.addCallback(_check)
3844 def test_share_corruption(self):
3845 self._poll_should_ignore_these_errors = [
3846 UnknownMutableContainerVersionError,
3847 UnknownImmutableContainerVersionError,
3849 basedir = "storage/LeaseCrawler/share_corruption"
3850 fileutil.make_dirs(basedir)
3851 ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3852 w = StorageStatus(ss)
3853 # make it start sooner than usual.
3854 lc = ss.lease_checker
3855 lc.stop_after_first_bucket = True
3859 # create a few shares, with some leases on them
3860 self.make_shares(ss)
3862 # now corrupt one, and make sure the lease-checker keeps going
3863 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3864 first = min(self.sis)
3865 first_b32 = base32.b2a(first)
3866 fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3869 f.write("BAD MAGIC")
3871 # if get_share_file() doesn't see the correct mutable magic, it
3872 # assumes the file is an immutable share, and then
3873 # immutable.ShareFile sees a bad version. So regardless of which kind
3874 # of share we corrupted, this will trigger an
3875 # UnknownImmutableContainerVersionError.
3877 # also create an empty bucket
3878 empty_si = base32.b2a("\x04"*16)
3879 empty_bucket_dir = os.path.join(ss.sharedir,
3880 storage_index_to_dir(empty_si))
3881 fileutil.make_dirs(empty_bucket_dir)
3883 ss.setServiceParent(self.s)
3885 d = fireEventually()
3887 # now examine the state right after the first bucket has been
3889 def _after_first_bucket(ignored):
3891 if "cycle-to-date" not in s:
3892 d2 = fireEventually()
3893 d2.addCallback(_after_first_bucket)
3895 so_far = s["cycle-to-date"]
3896 rec = so_far["space-recovered"]
3897 self.failUnlessEqual(rec["examined-buckets"], 1)
3898 self.failUnlessEqual(rec["examined-shares"], 0)
3899 self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3900 d.addCallback(_after_first_bucket)
3902 d.addCallback(lambda ign: self.render_json(w))
3903 def _check_json(json):
3904 data = simplejson.loads(json)
3905 # grr. json turns all dict keys into strings.
3906 so_far = data["lease-checker"]["cycle-to-date"]
3907 corrupt_shares = so_far["corrupt-shares"]
3908 # it also turns all tuples into lists
3909 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3910 d.addCallback(_check_json)
3911 d.addCallback(lambda ign: self.render1(w))
3912 def _check_html(html):
3913 s = remove_tags(html)
3914 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3915 d.addCallback(_check_html)
3918 return bool(lc.get_state()["last-cycle-finished"] is not None)
3919 d.addCallback(lambda ign: self.poll(_wait))
3921 def _after_first_cycle(ignored):
3923 last = s["history"][0]
3924 rec = last["space-recovered"]
3925 self.failUnlessEqual(rec["examined-buckets"], 5)
3926 self.failUnlessEqual(rec["examined-shares"], 3)
3927 self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3928 d.addCallback(_after_first_cycle)
3929 d.addCallback(lambda ign: self.render_json(w))
3930 def _check_json_history(json):
3931 data = simplejson.loads(json)
3932 last = data["lease-checker"]["history"]["0"]
3933 corrupt_shares = last["corrupt-shares"]
3934 self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3935 d.addCallback(_check_json_history)
3936 d.addCallback(lambda ign: self.render1(w))
3937 def _check_html_history(html):
3938 s = remove_tags(html)
3939 self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3940 d.addCallback(_check_html_history)
3943 self.flushLoggedErrors(UnknownMutableContainerVersionError,
3944 UnknownImmutableContainerVersionError)
3949 def render_json(self, page):
3950 d = self.render1(page, args={"t": ["json"]})
3953 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3956 self.s = service.MultiService()
3957 self.s.startService()
3959 return self.s.stopService()
3961 def test_no_server(self):
3962 w = StorageStatus(None)
3963 html = w.renderSynchronously()
3964 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3966 def test_status(self):
3967 basedir = "storage/WebStatus/status"
3968 fileutil.make_dirs(basedir)
3969 ss = StorageServer(basedir, "\x00" * 20)
3970 ss.setServiceParent(self.s)
3971 w = StorageStatus(ss)
3973 def _check_html(html):
3974 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3975 s = remove_tags(html)
3976 self.failUnlessIn("Accepting new shares: Yes", s)
3977 self.failUnlessIn("Reserved space: - 0 B (0)", s)
3978 d.addCallback(_check_html)
3979 d.addCallback(lambda ign: self.render_json(w))
3980 def _check_json(json):
3981 data = simplejson.loads(json)
3983 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3984 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3985 self.failUnlessIn("bucket-counter", data)
3986 self.failUnlessIn("lease-checker", data)
3987 d.addCallback(_check_json)
3990 def render_json(self, page):
3991 d = self.render1(page, args={"t": ["json"]})
3994 @mock.patch('allmydata.util.fileutil.get_disk_stats')
3995 def test_status_no_disk_stats(self, mock_get_disk_stats):
3996 mock_get_disk_stats.side_effect = AttributeError()
3998 # Some platforms may have no disk stats API. Make sure the code can handle that
3999 # (test runs on all platforms).
4000 basedir = "storage/WebStatus/status_no_disk_stats"
4001 fileutil.make_dirs(basedir)
4002 ss = StorageServer(basedir, "\x00" * 20)
4003 ss.setServiceParent(self.s)
4004 w = StorageStatus(ss)
4005 html = w.renderSynchronously()
4006 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4007 s = remove_tags(html)
4008 self.failUnlessIn("Accepting new shares: Yes", s)
4009 self.failUnlessIn("Total disk space: ?", s)
4010 self.failUnlessIn("Space Available to Tahoe: ?", s)
4011 self.failUnless(ss.get_available_space() is None)
4013 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4014 def test_status_bad_disk_stats(self, mock_get_disk_stats):
4015 mock_get_disk_stats.side_effect = OSError()
4017 # If the API to get disk stats exists but a call to it fails, then the status should
4018 # show that no shares will be accepted, and get_available_space() should be 0.
4019 basedir = "storage/WebStatus/status_bad_disk_stats"
4020 fileutil.make_dirs(basedir)
4021 ss = StorageServer(basedir, "\x00" * 20)
4022 ss.setServiceParent(self.s)
4023 w = StorageStatus(ss)
4024 html = w.renderSynchronously()
4025 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4026 s = remove_tags(html)
4027 self.failUnlessIn("Accepting new shares: No", s)
4028 self.failUnlessIn("Total disk space: ?", s)
4029 self.failUnlessIn("Space Available to Tahoe: ?", s)
4030 self.failUnlessEqual(ss.get_available_space(), 0)
4032 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4033 def test_status_right_disk_stats(self, mock_get_disk_stats):
4036 free_for_root = 4*GB
4037 free_for_nonroot = 3*GB
4038 reserved_space = 1*GB
4039 used = total - free_for_root
4040 avail = max(free_for_nonroot - reserved_space, 0)
4041 mock_get_disk_stats.return_value = {
4043 'free_for_root': free_for_root,
4044 'free_for_nonroot': free_for_nonroot,
4049 basedir = "storage/WebStatus/status_right_disk_stats"
4050 fileutil.make_dirs(basedir)
4051 ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4052 expecteddir = ss.sharedir
4053 ss.setServiceParent(self.s)
4054 w = StorageStatus(ss)
4055 html = w.renderSynchronously()
4057 self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4058 mock_get_disk_stats.call_args_list)
4060 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4061 s = remove_tags(html)
4062 self.failUnlessIn("Total disk space: 5.00 GB", s)
4063 self.failUnlessIn("Disk space used: - 1.00 GB", s)
4064 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4065 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4066 self.failUnlessIn("Reserved space: - 1.00 GB", s)
4067 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4068 self.failUnlessEqual(ss.get_available_space(), 2*GB)
4070 def test_readonly(self):
4071 basedir = "storage/WebStatus/readonly"
4072 fileutil.make_dirs(basedir)
4073 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4074 ss.setServiceParent(self.s)
4075 w = StorageStatus(ss)
4076 html = w.renderSynchronously()
4077 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4078 s = remove_tags(html)
4079 self.failUnlessIn("Accepting new shares: No", s)
4081 def test_reserved(self):
4082 basedir = "storage/WebStatus/reserved"
4083 fileutil.make_dirs(basedir)
4084 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4085 ss.setServiceParent(self.s)
4086 w = StorageStatus(ss)
4087 html = w.renderSynchronously()
4088 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4089 s = remove_tags(html)
4090 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4092 def test_huge_reserved(self):
4093 basedir = "storage/WebStatus/reserved"
4094 fileutil.make_dirs(basedir)
4095 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4096 ss.setServiceParent(self.s)
4097 w = StorageStatus(ss)
4098 html = w.renderSynchronously()
4099 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4100 s = remove_tags(html)
4101 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4103 def test_util(self):
4104 w = StorageStatus(None)
4105 self.failUnlessEqual(w.render_space(None, None), "?")
4106 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4107 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4108 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4109 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4110 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)