2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil, base32
9 from allmydata.storage import BucketWriter, BucketReader, \
10 StorageServer, MutableShareFile, \
11 storage_index_to_dir, DataTooLargeError, LeaseInfo
12 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
14 from allmydata.interfaces import BadWriteEnablerError
15 from allmydata.test.common import LoggingServiceParent
20 def __init__(self, ignore_disconnectors=False):
21 self.ignore = ignore_disconnectors
22 self.disconnectors = {}
23 def notifyOnDisconnect(self, f, *args, **kwargs):
27 self.disconnectors[m] = (f, args, kwargs)
29 def dontNotifyOnDisconnect(self, marker):
32 del self.disconnectors[marker]
34 class FakeStatsProvider:
35 def count(self, name, delta=1):
37 def register_producer(self, producer):
40 class Bucket(unittest.TestCase):
41 def make_workdir(self, name):
42 basedir = os.path.join("storage", "Bucket", name)
43 incoming = os.path.join(basedir, "tmp", "bucket")
44 final = os.path.join(basedir, "bucket")
45 fileutil.make_dirs(basedir)
46 fileutil.make_dirs(os.path.join(basedir, "tmp"))
47 return incoming, final
49 def bucket_writer_closed(self, bw, consumed):
51 def add_latency(self, category, latency):
53 def count(self, name, delta=1):
58 renew_secret = os.urandom(32)
59 cancel_secret = os.urandom(32)
60 expiration_time = time.time() + 5000
61 return LeaseInfo(owner_num, renew_secret, cancel_secret,
62 expiration_time, "\x00" * 20)
64 def test_create(self):
65 incoming, final = self.make_workdir("test_create")
66 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
68 bw.remote_write(0, "a"*25)
69 bw.remote_write(25, "b"*25)
70 bw.remote_write(50, "c"*25)
71 bw.remote_write(75, "d"*7)
74 def test_readwrite(self):
75 incoming, final = self.make_workdir("test_readwrite")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*7) # last block may be short
84 br = BucketReader(self, bw.finalhome)
85 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
86 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
87 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
91 def callRemote(self, methname, *args, **kwargs):
93 meth = getattr(self.target, "remote_" + methname)
94 return meth(*args, **kwargs)
95 return defer.maybeDeferred(_call)
97 class BucketProxy(unittest.TestCase):
98 def make_bucket(self, name, size):
99 basedir = os.path.join("storage", "BucketProxy", name)
100 incoming = os.path.join(basedir, "tmp", "bucket")
101 final = os.path.join(basedir, "bucket")
102 fileutil.make_dirs(basedir)
103 fileutil.make_dirs(os.path.join(basedir, "tmp"))
104 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
110 def make_lease(self):
112 renew_secret = os.urandom(32)
113 cancel_secret = os.urandom(32)
114 expiration_time = time.time() + 5000
115 return LeaseInfo(owner_num, renew_secret, cancel_secret,
116 expiration_time, "\x00" * 20)
118 def bucket_writer_closed(self, bw, consumed):
120 def add_latency(self, category, latency):
122 def count(self, name, delta=1):
125 def test_create(self):
126 bw, rb, sharefname = self.make_bucket("test_create", 500)
127 bp = WriteBucketProxy(rb,
132 uri_extension_size=500, nodeid=None)
133 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
135 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
136 # Let's pretend each share has 100 bytes of data, and that there are
137 # 4 segments (25 bytes each), and 8 shares total. So the three
138 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
139 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
140 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
141 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
142 # long. That should make the whole share:
144 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
145 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
147 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
149 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
151 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
153 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
155 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
157 uri_extension = "s" + "E"*498 + "e"
159 bw, rb, sharefname = self.make_bucket(name, sharesize)
165 uri_extension_size=len(uri_extension),
169 d.addCallback(lambda res: bp.put_block(0, "a"*25))
170 d.addCallback(lambda res: bp.put_block(1, "b"*25))
171 d.addCallback(lambda res: bp.put_block(2, "c"*25))
172 d.addCallback(lambda res: bp.put_block(3, "d"*20))
173 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
174 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
175 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
176 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
177 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
178 d.addCallback(lambda res: bp.close())
180 # now read everything back
181 def _start_reading(res):
182 br = BucketReader(self, sharefname)
185 rbp = rbp_class(rb, peerid="abc")
186 self.failUnless("to peer" in repr(rbp))
187 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
189 d1 = rbp.startIfNecessary()
190 d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
191 d1.addCallback(lambda res: rbp.get_block(0))
192 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
193 d1.addCallback(lambda res: rbp.get_block(1))
194 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
195 d1.addCallback(lambda res: rbp.get_block(2))
196 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
197 d1.addCallback(lambda res: rbp.get_block(3))
198 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
200 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
201 d1.addCallback(lambda res:
202 self.failUnlessEqual(res, plaintext_hashes))
203 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
204 d1.addCallback(lambda res:
205 self.failUnlessEqual(res, crypttext_hashes))
206 d1.addCallback(lambda res: rbp.get_block_hashes())
207 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
208 d1.addCallback(lambda res: rbp.get_share_hashes())
209 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
210 d1.addCallback(lambda res: rbp.get_uri_extension())
211 d1.addCallback(lambda res:
212 self.failUnlessEqual(res, uri_extension))
216 d.addCallback(_start_reading)
220 def test_readwrite_v1(self):
221 return self._do_test_readwrite("test_readwrite_v1",
222 0x24, WriteBucketProxy, ReadBucketProxy)
224 def test_readwrite_v2(self):
225 return self._do_test_readwrite("test_readwrite_v2",
226 0x44, WriteBucketProxy_v2, ReadBucketProxy)
228 class Server(unittest.TestCase):
231 self.sparent = LoggingServiceParent()
232 self._lease_secret = itertools.count()
234 return self.sparent.stopService()
236 def workdir(self, name):
237 basedir = os.path.join("storage", "Server", name)
240 def create(self, name, sizelimit=None):
241 workdir = self.workdir(name)
242 ss = StorageServer(workdir, sizelimit,
243 stats_provider=FakeStatsProvider())
244 ss.setNodeID("\x00" * 20)
245 ss.setServiceParent(self.sparent)
248 def test_create(self):
249 ss = self.create("test_create")
251 def allocate(self, ss, storage_index, sharenums, size, canary=None):
252 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
253 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
255 canary = FakeCanary()
256 return ss.remote_allocate_buckets(storage_index,
257 renew_secret, cancel_secret,
258 sharenums, size, canary)
260 def test_dont_overfill_dirs(self):
262 This test asserts that if you add a second share whose storage index
263 share lots of leading bits with an extant share (but isn't the exact
264 same storage index), this won't add an entry to the share directory.
266 ss = self.create("test_dont_overfill_dirs")
267 already, writers = self.allocate(ss, "storageindex", [0], 10)
268 for i, wb in writers.items():
269 wb.remote_write(0, "%10d" % i)
271 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
273 children_of_storedir = set(os.listdir(storedir))
275 # Now store another one under another storageindex that has leading
276 # chars the same as the first storageindex.
277 already, writers = self.allocate(ss, "storageindey", [0], 10)
278 for i, wb in writers.items():
279 wb.remote_write(0, "%10d" % i)
281 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
283 new_children_of_storedir = set(os.listdir(storedir))
284 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
286 def test_remove_incoming(self):
287 ss = self.create("test_remove_incoming")
288 already, writers = self.allocate(ss, "vid", range(3), 10)
289 for i,wb in writers.items():
290 wb.remote_write(0, "%10d" % i)
292 incoming_share_dir = wb.incominghome
293 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
294 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
295 incoming_dir = os.path.dirname(incoming_prefix_dir)
296 self.failIf(os.path.exists(incoming_bucket_dir))
297 self.failIf(os.path.exists(incoming_prefix_dir))
298 self.failUnless(os.path.exists(incoming_dir))
300 def test_allocate(self):
301 ss = self.create("test_allocate")
303 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
305 canary = FakeCanary()
306 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
307 self.failUnlessEqual(already, set())
308 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
310 # while the buckets are open, they should not count as readable
311 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
314 for i,wb in writers.items():
315 wb.remote_write(0, "%25d" % i)
317 # aborting a bucket that was already closed is a no-op
320 # now they should be readable
321 b = ss.remote_get_buckets("allocate")
322 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
323 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
325 # now if we ask about writing again, the server should offer those
326 # three buckets as already present. It should offer them even if we
327 # don't ask about those specific ones.
328 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
329 self.failUnlessEqual(already, set([0,1,2]))
330 self.failUnlessEqual(set(writers.keys()), set([3,4]))
332 # while those two buckets are open for writing, the server should
333 # refuse to offer them to uploaders
335 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
336 self.failUnlessEqual(already2, set([0,1,2]))
337 self.failUnlessEqual(set(writers2.keys()), set([5]))
339 # aborting the writes should remove the tempfiles
340 for i,wb in writers2.items():
342 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
343 self.failUnlessEqual(already2, set([0,1,2]))
344 self.failUnlessEqual(set(writers2.keys()), set([5]))
346 for i,wb in writers2.items():
348 for i,wb in writers.items():
351 def test_disconnect(self):
352 # simulate a disconnection
353 ss = self.create("test_disconnect")
354 canary = FakeCanary()
355 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
356 self.failUnlessEqual(already, set())
357 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
358 for (f,args,kwargs) in canary.disconnectors.values():
363 # that ought to delete the incoming shares
364 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
365 self.failUnlessEqual(already, set())
366 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
368 def test_sizelimits(self):
369 ss = self.create("test_sizelimits", 5000)
370 # a newly created and filled share incurs this much overhead, beyond
371 # the size we request.
373 LEASE_SIZE = 4+32+32+4
374 canary = FakeCanary(True)
375 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
376 self.failUnlessEqual(len(writers), 3)
377 # now the StorageServer should have 3000 bytes provisionally
378 # allocated, allowing only 2000 more to be claimed
379 self.failUnlessEqual(len(ss._active_writers), 3)
381 # allocating 1001-byte shares only leaves room for one
382 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
383 self.failUnlessEqual(len(writers2), 1)
384 self.failUnlessEqual(len(ss._active_writers), 4)
386 # we abandon the first set, so their provisional allocation should be
390 self.failUnlessEqual(len(ss._active_writers), 1)
391 # now we have a provisional allocation of 1001 bytes
393 # and we close the second set, so their provisional allocation should
394 # become real, long-term allocation, and grows to include the
396 for bw in writers2.values():
397 bw.remote_write(0, "a"*25)
402 self.failUnlessEqual(len(ss._active_writers), 0)
404 allocated = 1001 + OVERHEAD + LEASE_SIZE
405 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
406 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
407 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
408 self.failUnlessEqual(len(writers3), 39)
409 self.failUnlessEqual(len(ss._active_writers), 39)
413 self.failUnlessEqual(len(ss._active_writers), 0)
414 ss.disownServiceParent()
417 # creating a new StorageServer in the same directory should see the
420 # metadata that goes into the share file is counted upon share close,
421 # as well as at startup. metadata that goes into other files will not
422 # be counted until the next startup, so if we were creating any
423 # extra-file metadata, the allocation would be more than 'allocated'
424 # and this test would need to be changed.
425 ss = self.create("test_sizelimits", 5000)
426 already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
427 self.failUnlessEqual(len(writers4), 39)
428 self.failUnlessEqual(len(ss._active_writers), 39)
431 basedir = self.workdir("test_seek_behavior")
432 fileutil.make_dirs(basedir)
433 filename = os.path.join(basedir, "testfile")
434 f = open(filename, "wb")
437 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
438 # files. mode="a" preserves previous contents but does not allow
439 # seeking-to-create-holes. mode="r+" allows both.
440 f = open(filename, "rb+")
444 filelen = os.stat(filename)[stat.ST_SIZE]
445 self.failUnlessEqual(filelen, 100+3)
446 f2 = open(filename, "rb")
447 self.failUnlessEqual(f2.read(5), "start")
450 def test_leases(self):
451 ss = self.create("test_leases")
452 canary = FakeCanary()
456 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
457 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
458 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
459 sharenums, size, canary)
460 self.failUnlessEqual(len(already), 0)
461 self.failUnlessEqual(len(writers), 5)
462 for wb in writers.values():
465 leases = list(ss.get_leases("si0"))
466 self.failUnlessEqual(len(leases), 1)
467 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
469 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
470 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
471 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
472 sharenums, size, canary)
473 for wb in writers.values():
476 # take out a second lease on si1
477 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
478 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
479 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
480 sharenums, size, canary)
481 self.failUnlessEqual(len(already), 5)
482 self.failUnlessEqual(len(writers), 0)
484 leases = list(ss.get_leases("si1"))
485 self.failUnlessEqual(len(leases), 2)
486 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
488 # check that si0 is readable
489 readers = ss.remote_get_buckets("si0")
490 self.failUnlessEqual(len(readers), 5)
492 # renew the first lease. Only the proper renew_secret should work
493 ss.remote_renew_lease("si0", rs0)
494 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
495 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
497 # check that si0 is still readable
498 readers = ss.remote_get_buckets("si0")
499 self.failUnlessEqual(len(readers), 5)
502 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
503 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
504 ss.remote_cancel_lease("si0", cs0)
506 # si0 should now be gone
507 readers = ss.remote_get_buckets("si0")
508 self.failUnlessEqual(len(readers), 0)
509 # and the renew should no longer work
510 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
513 # cancel the first lease on si1, leaving the second in place
514 ss.remote_cancel_lease("si1", cs1)
515 readers = ss.remote_get_buckets("si1")
516 self.failUnlessEqual(len(readers), 5)
517 # the corresponding renew should no longer work
518 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
520 leases = list(ss.get_leases("si1"))
521 self.failUnlessEqual(len(leases), 1)
522 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
524 ss.remote_renew_lease("si1", rs2)
525 # cancelling the second should make it go away
526 ss.remote_cancel_lease("si1", cs2)
527 readers = ss.remote_get_buckets("si1")
528 self.failUnlessEqual(len(readers), 0)
529 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
530 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
532 leases = list(ss.get_leases("si1"))
533 self.failUnlessEqual(len(leases), 0)
536 # test overlapping uploads
537 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
538 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
539 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
540 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
541 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
542 sharenums, size, canary)
543 self.failUnlessEqual(len(already), 0)
544 self.failUnlessEqual(len(writers), 5)
545 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
546 sharenums, size, canary)
547 self.failUnlessEqual(len(already2), 0)
548 self.failUnlessEqual(len(writers2), 0)
549 for wb in writers.values():
552 leases = list(ss.get_leases("si3"))
553 self.failUnlessEqual(len(leases), 1)
555 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
556 sharenums, size, canary)
557 self.failUnlessEqual(len(already3), 5)
558 self.failUnlessEqual(len(writers3), 0)
560 leases = list(ss.get_leases("si3"))
561 self.failUnlessEqual(len(leases), 2)
563 def test_readonly(self):
564 workdir = self.workdir("test_readonly")
565 ss = StorageServer(workdir, readonly_storage=True)
566 ss.setNodeID("\x00" * 20)
567 ss.setServiceParent(self.sparent)
569 canary = FakeCanary()
570 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
571 self.failUnlessEqual(already, set())
572 self.failUnlessEqual(writers, {})
574 def test_discard(self):
575 # discard is really only used for other tests, but we test it anyways
576 workdir = self.workdir("test_discard")
577 ss = StorageServer(workdir, discard_storage=True)
578 ss.setNodeID("\x00" * 20)
579 ss.setServiceParent(self.sparent)
581 canary = FakeCanary()
582 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
583 self.failUnlessEqual(already, set())
584 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
585 for i,wb in writers.items():
586 wb.remote_write(0, "%25d" % i)
588 # since we discard the data, the shares should be present but sparse.
589 # Since we write with some seeks, the data we read back will be all
591 b = ss.remote_get_buckets("vid")
592 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
593 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
595 def test_advise_corruption(self):
596 workdir = self.workdir("test_advise_corruption")
597 ss = StorageServer(workdir, discard_storage=True)
598 ss.setNodeID("\x00" * 20)
599 ss.setServiceParent(self.sparent)
601 si0_s = base32.b2a("si0")
602 ss.remote_advise_corrupt_share("immutable", "si0", 0,
603 "This share smells funny.\n")
604 reportdir = os.path.join(workdir, "corruption-advisories")
605 reports = os.listdir(reportdir)
606 self.failUnlessEqual(len(reports), 1)
607 report_si0 = reports[0]
608 self.failUnless(si0_s in report_si0, report_si0)
609 f = open(os.path.join(reportdir, report_si0), "r")
612 self.failUnless("type: immutable" in report)
613 self.failUnless(("storage_index: %s" % si0_s) in report)
614 self.failUnless("share_number: 0" in report)
615 self.failUnless("This share smells funny." in report)
617 # test the RIBucketWriter version too
618 si1_s = base32.b2a("si1")
619 already,writers = self.allocate(ss, "si1", [1], 75)
620 self.failUnlessEqual(already, set())
621 self.failUnlessEqual(set(writers.keys()), set([1]))
622 writers[1].remote_write(0, "data")
623 writers[1].remote_close()
625 b = ss.remote_get_buckets("si1")
626 self.failUnlessEqual(set(b.keys()), set([1]))
627 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
629 reports = os.listdir(reportdir)
630 self.failUnlessEqual(len(reports), 2)
631 report_si1 = [r for r in reports if si1_s in r][0]
632 f = open(os.path.join(reportdir, report_si1), "r")
635 self.failUnless("type: immutable" in report)
636 self.failUnless(("storage_index: %s" % si1_s) in report)
637 self.failUnless("share_number: 1" in report)
638 self.failUnless("This share tastes like dust." in report)
642 class MutableServer(unittest.TestCase):
645 self.sparent = LoggingServiceParent()
646 self._lease_secret = itertools.count()
648 return self.sparent.stopService()
650 def workdir(self, name):
651 basedir = os.path.join("storage", "MutableServer", name)
654 def create(self, name, sizelimit=None):
655 workdir = self.workdir(name)
656 ss = StorageServer(workdir, sizelimit)
657 ss.setServiceParent(self.sparent)
658 ss.setNodeID("\x00" * 20)
661 def test_create(self):
662 ss = self.create("test_create")
664 def write_enabler(self, we_tag):
665 return hashutil.tagged_hash("we_blah", we_tag)
667 def renew_secret(self, tag):
668 return hashutil.tagged_hash("renew_blah", str(tag))
670 def cancel_secret(self, tag):
671 return hashutil.tagged_hash("cancel_blah", str(tag))
673 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
674 write_enabler = self.write_enabler(we_tag)
675 renew_secret = self.renew_secret(lease_tag)
676 cancel_secret = self.cancel_secret(lease_tag)
677 rstaraw = ss.remote_slot_testv_and_readv_and_writev
678 testandwritev = dict( [ (shnum, ([], [], None) )
679 for shnum in sharenums ] )
681 rc = rstaraw(storage_index,
682 (write_enabler, renew_secret, cancel_secret),
685 (did_write, readv_data) = rc
686 self.failUnless(did_write)
687 self.failUnless(isinstance(readv_data, dict))
688 self.failUnlessEqual(len(readv_data), 0)
690 def test_container_size(self):
691 ss = self.create("test_container_size")
692 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
694 rstaraw = ss.remote_slot_testv_and_readv_and_writev
695 secrets = ( self.write_enabler("we1"),
696 self.renew_secret("we1"),
697 self.cancel_secret("we1") )
698 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
699 answer = rstaraw("si1", secrets,
700 {0: ([], [(0,data)], len(data)+12)},
702 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
704 # trying to make the container too large will raise an exception
705 TOOBIG = MutableShareFile.MAX_SIZE + 10
706 self.failUnlessRaises(DataTooLargeError,
707 rstaraw, "si1", secrets,
708 {0: ([], [(0,data)], TOOBIG)},
711 # it should be possible to make the container smaller, although at
712 # the moment this doesn't actually affect the share
713 answer = rstaraw("si1", secrets,
714 {0: ([], [(0,data)], len(data)+8)},
716 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
718 def test_allocate(self):
719 ss = self.create("test_allocate")
720 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
723 read = ss.remote_slot_readv
724 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
726 self.failUnlessEqual(read("si1", [], [(0, 10)]),
727 {0: [""], 1: [""], 2: [""]})
728 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
732 secrets = ( self.write_enabler("we1"),
733 self.renew_secret("we1"),
734 self.cancel_secret("we1") )
735 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
736 write = ss.remote_slot_testv_and_readv_and_writev
737 answer = write("si1", secrets,
738 {0: ([], [(0,data)], None)},
740 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
742 self.failUnlessEqual(read("si1", [0], [(0,20)]),
743 {0: ["00000000001111111111"]})
744 self.failUnlessEqual(read("si1", [0], [(95,10)]),
746 #self.failUnlessEqual(s0.remote_get_length(), 100)
748 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
749 f = self.failUnlessRaises(BadWriteEnablerError,
750 write, "si1", bad_secrets,
752 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
754 # this testv should fail
755 answer = write("si1", secrets,
756 {0: ([(0, 12, "eq", "444444444444"),
757 (20, 5, "eq", "22222"),
764 self.failUnlessEqual(answer, (False,
765 {0: ["000000000011", "22222"],
769 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
772 answer = write("si1", secrets,
773 {0: ([(10, 5, "lt", "11111"),
780 self.failUnlessEqual(answer, (False,
785 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
788 def test_operators(self):
789 # test operators, the data we're comparing is '11111' in all cases.
790 # test both fail+pass, reset data after each one.
791 ss = self.create("test_operators")
793 secrets = ( self.write_enabler("we1"),
794 self.renew_secret("we1"),
795 self.cancel_secret("we1") )
796 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
797 write = ss.remote_slot_testv_and_readv_and_writev
798 read = ss.remote_slot_readv
801 write("si1", secrets,
802 {0: ([], [(0,data)], None)},
808 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
813 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
814 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
815 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
818 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
823 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
827 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
832 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
833 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
837 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
842 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
843 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
846 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
851 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
852 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
855 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
860 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
861 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
865 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
870 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
871 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
874 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
879 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
880 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
884 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
889 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
890 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
893 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
898 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
899 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
903 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
908 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
909 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
912 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
917 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
918 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
921 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
926 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
927 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
931 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
936 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
937 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
940 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
945 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
946 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
949 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
954 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
955 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
958 # finally, test some operators against empty shares
959 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
964 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
965 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
968 def test_readv(self):
969 ss = self.create("test_readv")
970 secrets = ( self.write_enabler("we1"),
971 self.renew_secret("we1"),
972 self.cancel_secret("we1") )
973 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
974 write = ss.remote_slot_testv_and_readv_and_writev
975 read = ss.remote_slot_readv
976 data = [("%d" % i) * 100 for i in range(3)]
977 rc = write("si1", secrets,
978 {0: ([], [(0,data[0])], None),
979 1: ([], [(0,data[1])], None),
980 2: ([], [(0,data[2])], None),
982 self.failUnlessEqual(rc, (True, {}))
984 answer = read("si1", [], [(0, 10)])
985 self.failUnlessEqual(answer, {0: ["0"*10],
989 def compare_leases_without_timestamps(self, leases_a, leases_b):
990 self.failUnlessEqual(len(leases_a), len(leases_b))
991 for i in range(len(leases_a)):
992 num_a, a = leases_a[i]
993 num_b, b = leases_b[i]
994 self.failUnlessEqual(num_a, num_b)
995 self.failUnlessEqual(a.owner_num, b.owner_num)
996 self.failUnlessEqual(a.renew_secret, b.renew_secret)
997 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
998 self.failUnlessEqual(a.nodeid, b.nodeid)
1000 def compare_leases(self, leases_a, leases_b):
1001 self.failUnlessEqual(len(leases_a), len(leases_b))
1002 for i in range(len(leases_a)):
1003 num_a, a = leases_a[i]
1004 num_b, b = leases_b[i]
1005 self.failUnlessEqual(num_a, num_b)
1006 self.failUnlessEqual(a.owner_num, b.owner_num)
1007 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1008 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1009 self.failUnlessEqual(a.nodeid, b.nodeid)
1010 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1012 def test_leases(self):
1013 ss = self.create("test_leases", sizelimit=1000*1000)
1015 return ( self.write_enabler("we1"),
1016 self.renew_secret("we1-%d" % n),
1017 self.cancel_secret("we1-%d" % n) )
1018 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1019 write = ss.remote_slot_testv_and_readv_and_writev
1020 read = ss.remote_slot_readv
1021 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1022 self.failUnlessEqual(rc, (True, {}))
1024 # create a random non-numeric file in the bucket directory, to
1025 # exercise the code that's supposed to ignore those.
1026 bucket_dir = os.path.join(self.workdir("test_leases"),
1027 "shares", storage_index_to_dir("si1"))
1028 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1029 f.write("you ought to be ignoring me\n")
1032 # re-allocate the slots and use the same secrets, that should update
1034 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1037 ss.remote_renew_lease("si1", secrets(0)[1])
1039 # now allocate them with a bunch of different secrets, to trigger the
1040 # extended lease code
1041 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1042 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
1043 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1044 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1045 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1047 # cancel one of them
1048 ss.remote_cancel_lease("si1", secrets(5)[2])
1050 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1051 all_leases = s0.debug_get_leases()
1052 self.failUnlessEqual(len(all_leases), 5)
1054 # and write enough data to expand the container, forcing the server
1055 # to move the leases
1056 write("si1", secrets(0),
1057 {0: ([], [(0,data)], 200), },
1060 # read back the leases, make sure they're still intact.
1061 self.compare_leases_without_timestamps(all_leases,
1062 s0.debug_get_leases())
1064 ss.remote_renew_lease("si1", secrets(0)[1])
1065 ss.remote_renew_lease("si1", secrets(1)[1])
1066 ss.remote_renew_lease("si1", secrets(2)[1])
1067 ss.remote_renew_lease("si1", secrets(3)[1])
1068 ss.remote_renew_lease("si1", secrets(4)[1])
1069 self.compare_leases_without_timestamps(all_leases,
1070 s0.debug_get_leases())
1071 # get a new copy of the leases, with the current timestamps. Reading
1072 # data and failing to renew/cancel leases should leave the timestamps
1074 all_leases = s0.debug_get_leases()
1075 # renewing with a bogus token should prompt an error message
1077 # examine the exception thus raised, make sure the old nodeid is
1078 # present, to provide for share migration
1079 e = self.failUnlessRaises(IndexError,
1080 ss.remote_renew_lease, "si1",
1083 self.failUnless("Unable to renew non-existent lease" in e_s)
1084 self.failUnless("I have leases accepted by nodeids:" in e_s)
1085 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1087 # same for cancelling
1088 self.failUnlessRaises(IndexError,
1089 ss.remote_cancel_lease, "si1",
1091 self.compare_leases(all_leases, s0.debug_get_leases())
1093 # reading shares should not modify the timestamp
1094 read("si1", [], [(0,200)])
1095 self.compare_leases(all_leases, s0.debug_get_leases())
1097 write("si1", secrets(0),
1098 {0: ([], [(200, "make me bigger")], None)}, [])
1099 self.compare_leases_without_timestamps(all_leases,
1100 s0.debug_get_leases())
1102 write("si1", secrets(0),
1103 {0: ([], [(500, "make me really bigger")], None)}, [])
1104 self.compare_leases_without_timestamps(all_leases,
1105 s0.debug_get_leases())
1107 # now cancel them all
1108 ss.remote_cancel_lease("si1", secrets(0)[2])
1109 ss.remote_cancel_lease("si1", secrets(1)[2])
1110 ss.remote_cancel_lease("si1", secrets(2)[2])
1111 ss.remote_cancel_lease("si1", secrets(3)[2])
1113 # the slot should still be there
1114 remaining_shares = read("si1", [], [(0,10)])
1115 self.failUnlessEqual(len(remaining_shares), 1)
1116 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1118 # cancelling a non-existent lease should raise an IndexError
1119 self.failUnlessRaises(IndexError,
1120 ss.remote_cancel_lease, "si1", "nonsecret")
1122 # and the slot should still be there
1123 remaining_shares = read("si1", [], [(0,10)])
1124 self.failUnlessEqual(len(remaining_shares), 1)
1125 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1127 ss.remote_cancel_lease("si1", secrets(4)[2])
1128 # now the slot should be gone
1129 no_shares = read("si1", [], [(0,10)])
1130 self.failUnlessEqual(no_shares, {})
1132 # cancelling a lease on a non-existent share should raise an IndexError
1133 self.failUnlessRaises(IndexError,
1134 ss.remote_cancel_lease, "si2", "nonsecret")
1137 class Stats(unittest.TestCase):
1140 self.sparent = LoggingServiceParent()
1141 self._lease_secret = itertools.count()
1143 return self.sparent.stopService()
1145 def workdir(self, name):
1146 basedir = os.path.join("storage", "Server", name)
1149 def create(self, name, sizelimit=None):
1150 workdir = self.workdir(name)
1151 ss = StorageServer(workdir, sizelimit)
1152 ss.setNodeID("\x00" * 20)
1153 ss.setServiceParent(self.sparent)
1156 def test_latencies(self):
1157 ss = self.create("test_latencies")
1158 for i in range(10000):
1159 ss.add_latency("allocate", 1.0 * i)
1160 for i in range(1000):
1161 ss.add_latency("renew", 1.0 * i)
1163 ss.add_latency("cancel", 2.0 * i)
1164 ss.add_latency("get", 5.0)
1166 output = ss.get_latencies()
1168 self.failUnlessEqual(sorted(output.keys()),
1169 sorted(["allocate", "renew", "cancel", "get"]))
1170 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1171 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1172 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1173 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1174 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1175 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1176 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1177 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1178 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1180 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1181 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1182 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1183 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1184 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1185 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1186 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1187 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1188 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1190 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1191 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1192 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1193 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1194 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1195 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1196 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1197 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1198 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1200 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1201 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1202 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1203 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1204 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1205 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1206 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1207 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1208 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)