2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil, base32
9 from allmydata.storage import BucketWriter, BucketReader, \
10 StorageServer, MutableShareFile, \
11 storage_index_to_dir, DataTooLargeError, LeaseInfo
12 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
14 from allmydata.interfaces import BadWriteEnablerError
15 from allmydata.test.common import LoggingServiceParent
20 def __init__(self, ignore_disconnectors=False):
21 self.ignore = ignore_disconnectors
22 self.disconnectors = {}
23 def notifyOnDisconnect(self, f, *args, **kwargs):
27 self.disconnectors[m] = (f, args, kwargs)
29 def dontNotifyOnDisconnect(self, marker):
32 del self.disconnectors[marker]
34 class FakeStatsProvider:
35 def count(self, name, delta=1):
37 def register_producer(self, producer):
40 class Bucket(unittest.TestCase):
41 def make_workdir(self, name):
42 basedir = os.path.join("storage", "Bucket", name)
43 incoming = os.path.join(basedir, "tmp", "bucket")
44 final = os.path.join(basedir, "bucket")
45 fileutil.make_dirs(basedir)
46 fileutil.make_dirs(os.path.join(basedir, "tmp"))
47 return incoming, final
49 def bucket_writer_closed(self, bw, consumed):
51 def add_latency(self, category, latency):
53 def count(self, name, delta=1):
58 renew_secret = os.urandom(32)
59 cancel_secret = os.urandom(32)
60 expiration_time = time.time() + 5000
61 return LeaseInfo(owner_num, renew_secret, cancel_secret,
62 expiration_time, "\x00" * 20)
64 def test_create(self):
65 incoming, final = self.make_workdir("test_create")
66 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
68 bw.remote_write(0, "a"*25)
69 bw.remote_write(25, "b"*25)
70 bw.remote_write(50, "c"*25)
71 bw.remote_write(75, "d"*7)
74 def test_readwrite(self):
75 incoming, final = self.make_workdir("test_readwrite")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*7) # last block may be short
84 br = BucketReader(self, bw.finalhome)
85 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
86 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
87 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
91 def callRemote(self, methname, *args, **kwargs):
93 meth = getattr(self.target, "remote_" + methname)
94 return meth(*args, **kwargs)
95 return defer.maybeDeferred(_call)
97 class BucketProxy(unittest.TestCase):
98 def make_bucket(self, name, size):
99 basedir = os.path.join("storage", "BucketProxy", name)
100 incoming = os.path.join(basedir, "tmp", "bucket")
101 final = os.path.join(basedir, "bucket")
102 fileutil.make_dirs(basedir)
103 fileutil.make_dirs(os.path.join(basedir, "tmp"))
104 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
110 def make_lease(self):
112 renew_secret = os.urandom(32)
113 cancel_secret = os.urandom(32)
114 expiration_time = time.time() + 5000
115 return LeaseInfo(owner_num, renew_secret, cancel_secret,
116 expiration_time, "\x00" * 20)
118 def bucket_writer_closed(self, bw, consumed):
120 def add_latency(self, category, latency):
122 def count(self, name, delta=1):
125 def test_create(self):
126 bw, rb, sharefname = self.make_bucket("test_create", 500)
127 bp = WriteBucketProxy(rb,
132 uri_extension_size=500, nodeid=None)
133 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
135 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
136 # Let's pretend each share has 100 bytes of data, and that there are
137 # 4 segments (25 bytes each), and 8 shares total. So the three
138 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
139 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
140 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
141 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
142 # long. That should make the whole share:
144 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
145 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
147 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
149 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
151 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
153 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
155 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
157 uri_extension = "s" + "E"*498 + "e"
159 bw, rb, sharefname = self.make_bucket(name, sharesize)
165 uri_extension_size=len(uri_extension),
169 d.addCallback(lambda res: bp.put_block(0, "a"*25))
170 d.addCallback(lambda res: bp.put_block(1, "b"*25))
171 d.addCallback(lambda res: bp.put_block(2, "c"*25))
172 d.addCallback(lambda res: bp.put_block(3, "d"*20))
173 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
174 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
175 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
176 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
177 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
178 d.addCallback(lambda res: bp.close())
180 # now read everything back
181 def _start_reading(res):
182 br = BucketReader(self, sharefname)
185 rbp = rbp_class(rb, peerid="abc")
186 self.failUnless("to peer" in repr(rbp))
187 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
189 d1 = rbp.startIfNecessary()
190 d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
191 d1.addCallback(lambda res: rbp.get_block(0))
192 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
193 d1.addCallback(lambda res: rbp.get_block(1))
194 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
195 d1.addCallback(lambda res: rbp.get_block(2))
196 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
197 d1.addCallback(lambda res: rbp.get_block(3))
198 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
200 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
201 d1.addCallback(lambda res:
202 self.failUnlessEqual(res, plaintext_hashes))
203 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
204 d1.addCallback(lambda res:
205 self.failUnlessEqual(res, crypttext_hashes))
206 d1.addCallback(lambda res: rbp.get_block_hashes())
207 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
208 d1.addCallback(lambda res: rbp.get_share_hashes())
209 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
210 d1.addCallback(lambda res: rbp.get_uri_extension())
211 d1.addCallback(lambda res:
212 self.failUnlessEqual(res, uri_extension))
216 d.addCallback(_start_reading)
220 def test_readwrite_v1(self):
221 return self._do_test_readwrite("test_readwrite_v1",
222 0x24, WriteBucketProxy, ReadBucketProxy)
224 def test_readwrite_v2(self):
225 return self._do_test_readwrite("test_readwrite_v2",
226 0x44, WriteBucketProxy_v2, ReadBucketProxy)
228 class FakeDiskStorageServer(StorageServer):
229 def stat_disk(self, d):
230 return self.DISKAVAIL
232 class Server(unittest.TestCase):
235 self.sparent = LoggingServiceParent()
236 self._lease_secret = itertools.count()
238 return self.sparent.stopService()
240 def workdir(self, name):
241 basedir = os.path.join("storage", "Server", name)
244 def create(self, name, reserved_space=0, klass=StorageServer):
245 workdir = self.workdir(name)
246 ss = klass(workdir, reserved_space=reserved_space,
247 stats_provider=FakeStatsProvider())
248 ss.setNodeID("\x00" * 20)
249 ss.setServiceParent(self.sparent)
252 def test_create(self):
253 ss = self.create("test_create")
255 def allocate(self, ss, storage_index, sharenums, size, canary=None):
256 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
257 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259 canary = FakeCanary()
260 return ss.remote_allocate_buckets(storage_index,
261 renew_secret, cancel_secret,
262 sharenums, size, canary)
264 def test_dont_overfill_dirs(self):
266 This test asserts that if you add a second share whose storage index
267 share lots of leading bits with an extant share (but isn't the exact
268 same storage index), this won't add an entry to the share directory.
270 ss = self.create("test_dont_overfill_dirs")
271 already, writers = self.allocate(ss, "storageindex", [0], 10)
272 for i, wb in writers.items():
273 wb.remote_write(0, "%10d" % i)
275 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
277 children_of_storedir = set(os.listdir(storedir))
279 # Now store another one under another storageindex that has leading
280 # chars the same as the first storageindex.
281 already, writers = self.allocate(ss, "storageindey", [0], 10)
282 for i, wb in writers.items():
283 wb.remote_write(0, "%10d" % i)
285 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
287 new_children_of_storedir = set(os.listdir(storedir))
288 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
290 def test_remove_incoming(self):
291 ss = self.create("test_remove_incoming")
292 already, writers = self.allocate(ss, "vid", range(3), 10)
293 for i,wb in writers.items():
294 wb.remote_write(0, "%10d" % i)
296 incoming_share_dir = wb.incominghome
297 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
298 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
299 incoming_dir = os.path.dirname(incoming_prefix_dir)
300 self.failIf(os.path.exists(incoming_bucket_dir))
301 self.failIf(os.path.exists(incoming_prefix_dir))
302 self.failUnless(os.path.exists(incoming_dir))
304 def test_allocate(self):
305 ss = self.create("test_allocate")
307 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
309 canary = FakeCanary()
310 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
311 self.failUnlessEqual(already, set())
312 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
314 # while the buckets are open, they should not count as readable
315 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
318 for i,wb in writers.items():
319 wb.remote_write(0, "%25d" % i)
321 # aborting a bucket that was already closed is a no-op
324 # now they should be readable
325 b = ss.remote_get_buckets("allocate")
326 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
327 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
329 # now if we ask about writing again, the server should offer those
330 # three buckets as already present. It should offer them even if we
331 # don't ask about those specific ones.
332 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
333 self.failUnlessEqual(already, set([0,1,2]))
334 self.failUnlessEqual(set(writers.keys()), set([3,4]))
336 # while those two buckets are open for writing, the server should
337 # refuse to offer them to uploaders
339 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
340 self.failUnlessEqual(already2, set([0,1,2]))
341 self.failUnlessEqual(set(writers2.keys()), set([5]))
343 # aborting the writes should remove the tempfiles
344 for i,wb in writers2.items():
346 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
347 self.failUnlessEqual(already2, set([0,1,2]))
348 self.failUnlessEqual(set(writers2.keys()), set([5]))
350 for i,wb in writers2.items():
352 for i,wb in writers.items():
355 def test_disconnect(self):
356 # simulate a disconnection
357 ss = self.create("test_disconnect")
358 canary = FakeCanary()
359 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
360 self.failUnlessEqual(already, set())
361 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
362 for (f,args,kwargs) in canary.disconnectors.values():
367 # that ought to delete the incoming shares
368 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
369 self.failUnlessEqual(already, set())
370 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
372 def test_reserved_space(self):
373 ss = self.create("test_reserved_space", reserved_space=10000,
374 klass=FakeDiskStorageServer)
375 # the FakeDiskStorageServer doesn't do real statvfs() calls
377 # 15k available, 10k reserved, leaves 5k for shares
379 # a newly created and filled share incurs this much overhead, beyond
380 # the size we request.
382 LEASE_SIZE = 4+32+32+4
383 canary = FakeCanary(True)
384 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
385 self.failUnlessEqual(len(writers), 3)
386 # now the StorageServer should have 3000 bytes provisionally
387 # allocated, allowing only 2000 more to be claimed
388 self.failUnlessEqual(len(ss._active_writers), 3)
390 # allocating 1001-byte shares only leaves room for one
391 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
392 self.failUnlessEqual(len(writers2), 1)
393 self.failUnlessEqual(len(ss._active_writers), 4)
395 # we abandon the first set, so their provisional allocation should be
399 self.failUnlessEqual(len(ss._active_writers), 1)
400 # now we have a provisional allocation of 1001 bytes
402 # and we close the second set, so their provisional allocation should
403 # become real, long-term allocation, and grows to include the
405 for bw in writers2.values():
406 bw.remote_write(0, "a"*25)
411 self.failUnlessEqual(len(ss._active_writers), 0)
413 allocated = 1001 + OVERHEAD + LEASE_SIZE
415 # we have to manually increase DISKAVAIL, since we're not doing real
417 ss.DISKAVAIL -= allocated
419 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
420 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
421 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
422 self.failUnlessEqual(len(writers3), 39)
423 self.failUnlessEqual(len(ss._active_writers), 39)
427 self.failUnlessEqual(len(ss._active_writers), 0)
428 ss.disownServiceParent()
432 basedir = self.workdir("test_seek_behavior")
433 fileutil.make_dirs(basedir)
434 filename = os.path.join(basedir, "testfile")
435 f = open(filename, "wb")
438 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
439 # files. mode="a" preserves previous contents but does not allow
440 # seeking-to-create-holes. mode="r+" allows both.
441 f = open(filename, "rb+")
445 filelen = os.stat(filename)[stat.ST_SIZE]
446 self.failUnlessEqual(filelen, 100+3)
447 f2 = open(filename, "rb")
448 self.failUnlessEqual(f2.read(5), "start")
451 def test_leases(self):
452 ss = self.create("test_leases")
453 canary = FakeCanary()
457 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
458 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
459 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
460 sharenums, size, canary)
461 self.failUnlessEqual(len(already), 0)
462 self.failUnlessEqual(len(writers), 5)
463 for wb in writers.values():
466 leases = list(ss.get_leases("si0"))
467 self.failUnlessEqual(len(leases), 1)
468 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
470 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
471 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
472 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
473 sharenums, size, canary)
474 for wb in writers.values():
477 # take out a second lease on si1
478 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
479 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
480 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
481 sharenums, size, canary)
482 self.failUnlessEqual(len(already), 5)
483 self.failUnlessEqual(len(writers), 0)
485 leases = list(ss.get_leases("si1"))
486 self.failUnlessEqual(len(leases), 2)
487 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
489 # check that si0 is readable
490 readers = ss.remote_get_buckets("si0")
491 self.failUnlessEqual(len(readers), 5)
493 # renew the first lease. Only the proper renew_secret should work
494 ss.remote_renew_lease("si0", rs0)
495 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
496 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
498 # check that si0 is still readable
499 readers = ss.remote_get_buckets("si0")
500 self.failUnlessEqual(len(readers), 5)
503 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
504 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
505 ss.remote_cancel_lease("si0", cs0)
507 # si0 should now be gone
508 readers = ss.remote_get_buckets("si0")
509 self.failUnlessEqual(len(readers), 0)
510 # and the renew should no longer work
511 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
514 # cancel the first lease on si1, leaving the second in place
515 ss.remote_cancel_lease("si1", cs1)
516 readers = ss.remote_get_buckets("si1")
517 self.failUnlessEqual(len(readers), 5)
518 # the corresponding renew should no longer work
519 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
521 leases = list(ss.get_leases("si1"))
522 self.failUnlessEqual(len(leases), 1)
523 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
525 ss.remote_renew_lease("si1", rs2)
526 # cancelling the second should make it go away
527 ss.remote_cancel_lease("si1", cs2)
528 readers = ss.remote_get_buckets("si1")
529 self.failUnlessEqual(len(readers), 0)
530 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
531 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
533 leases = list(ss.get_leases("si1"))
534 self.failUnlessEqual(len(leases), 0)
537 # test overlapping uploads
538 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
539 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
540 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
541 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
542 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
543 sharenums, size, canary)
544 self.failUnlessEqual(len(already), 0)
545 self.failUnlessEqual(len(writers), 5)
546 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
547 sharenums, size, canary)
548 self.failUnlessEqual(len(already2), 0)
549 self.failUnlessEqual(len(writers2), 0)
550 for wb in writers.values():
553 leases = list(ss.get_leases("si3"))
554 self.failUnlessEqual(len(leases), 1)
556 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
557 sharenums, size, canary)
558 self.failUnlessEqual(len(already3), 5)
559 self.failUnlessEqual(len(writers3), 0)
561 leases = list(ss.get_leases("si3"))
562 self.failUnlessEqual(len(leases), 2)
564 def test_readonly(self):
565 workdir = self.workdir("test_readonly")
566 ss = StorageServer(workdir, readonly_storage=True)
567 ss.setNodeID("\x00" * 20)
568 ss.setServiceParent(self.sparent)
570 canary = FakeCanary()
571 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
572 self.failUnlessEqual(already, set())
573 self.failUnlessEqual(writers, {})
575 stats = ss.get_stats()
576 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
578 if "storage_server.disk_avail" in stats:
579 # windows does not have os.statvfs, so it doesn't give us disk
580 # stats. But if there are stats, readonly_storage means
582 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
584 def test_discard(self):
585 # discard is really only used for other tests, but we test it anyways
586 workdir = self.workdir("test_discard")
587 ss = StorageServer(workdir, discard_storage=True)
588 ss.setNodeID("\x00" * 20)
589 ss.setServiceParent(self.sparent)
591 canary = FakeCanary()
592 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
593 self.failUnlessEqual(already, set())
594 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
595 for i,wb in writers.items():
596 wb.remote_write(0, "%25d" % i)
598 # since we discard the data, the shares should be present but sparse.
599 # Since we write with some seeks, the data we read back will be all
601 b = ss.remote_get_buckets("vid")
602 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
603 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
605 def test_advise_corruption(self):
606 workdir = self.workdir("test_advise_corruption")
607 ss = StorageServer(workdir, discard_storage=True)
608 ss.setNodeID("\x00" * 20)
609 ss.setServiceParent(self.sparent)
611 si0_s = base32.b2a("si0")
612 ss.remote_advise_corrupt_share("immutable", "si0", 0,
613 "This share smells funny.\n")
614 reportdir = os.path.join(workdir, "corruption-advisories")
615 reports = os.listdir(reportdir)
616 self.failUnlessEqual(len(reports), 1)
617 report_si0 = reports[0]
618 self.failUnless(si0_s in report_si0, report_si0)
619 f = open(os.path.join(reportdir, report_si0), "r")
622 self.failUnless("type: immutable" in report)
623 self.failUnless(("storage_index: %s" % si0_s) in report)
624 self.failUnless("share_number: 0" in report)
625 self.failUnless("This share smells funny." in report)
627 # test the RIBucketWriter version too
628 si1_s = base32.b2a("si1")
629 already,writers = self.allocate(ss, "si1", [1], 75)
630 self.failUnlessEqual(already, set())
631 self.failUnlessEqual(set(writers.keys()), set([1]))
632 writers[1].remote_write(0, "data")
633 writers[1].remote_close()
635 b = ss.remote_get_buckets("si1")
636 self.failUnlessEqual(set(b.keys()), set([1]))
637 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
639 reports = os.listdir(reportdir)
640 self.failUnlessEqual(len(reports), 2)
641 report_si1 = [r for r in reports if si1_s in r][0]
642 f = open(os.path.join(reportdir, report_si1), "r")
645 self.failUnless("type: immutable" in report)
646 self.failUnless(("storage_index: %s" % si1_s) in report)
647 self.failUnless("share_number: 1" in report)
648 self.failUnless("This share tastes like dust." in report)
652 class MutableServer(unittest.TestCase):
655 self.sparent = LoggingServiceParent()
656 self._lease_secret = itertools.count()
658 return self.sparent.stopService()
660 def workdir(self, name):
661 basedir = os.path.join("storage", "MutableServer", name)
664 def create(self, name):
665 workdir = self.workdir(name)
666 ss = StorageServer(workdir)
667 ss.setServiceParent(self.sparent)
668 ss.setNodeID("\x00" * 20)
671 def test_create(self):
672 ss = self.create("test_create")
674 def write_enabler(self, we_tag):
675 return hashutil.tagged_hash("we_blah", we_tag)
677 def renew_secret(self, tag):
678 return hashutil.tagged_hash("renew_blah", str(tag))
680 def cancel_secret(self, tag):
681 return hashutil.tagged_hash("cancel_blah", str(tag))
683 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
684 write_enabler = self.write_enabler(we_tag)
685 renew_secret = self.renew_secret(lease_tag)
686 cancel_secret = self.cancel_secret(lease_tag)
687 rstaraw = ss.remote_slot_testv_and_readv_and_writev
688 testandwritev = dict( [ (shnum, ([], [], None) )
689 for shnum in sharenums ] )
691 rc = rstaraw(storage_index,
692 (write_enabler, renew_secret, cancel_secret),
695 (did_write, readv_data) = rc
696 self.failUnless(did_write)
697 self.failUnless(isinstance(readv_data, dict))
698 self.failUnlessEqual(len(readv_data), 0)
700 def test_container_size(self):
701 ss = self.create("test_container_size")
702 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
704 rstaraw = ss.remote_slot_testv_and_readv_and_writev
705 secrets = ( self.write_enabler("we1"),
706 self.renew_secret("we1"),
707 self.cancel_secret("we1") )
708 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
709 answer = rstaraw("si1", secrets,
710 {0: ([], [(0,data)], len(data)+12)},
712 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
714 # trying to make the container too large will raise an exception
715 TOOBIG = MutableShareFile.MAX_SIZE + 10
716 self.failUnlessRaises(DataTooLargeError,
717 rstaraw, "si1", secrets,
718 {0: ([], [(0,data)], TOOBIG)},
721 # it should be possible to make the container smaller, although at
722 # the moment this doesn't actually affect the share
723 answer = rstaraw("si1", secrets,
724 {0: ([], [(0,data)], len(data)+8)},
726 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
728 def test_allocate(self):
729 ss = self.create("test_allocate")
730 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
733 read = ss.remote_slot_readv
734 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
736 self.failUnlessEqual(read("si1", [], [(0, 10)]),
737 {0: [""], 1: [""], 2: [""]})
738 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
742 secrets = ( self.write_enabler("we1"),
743 self.renew_secret("we1"),
744 self.cancel_secret("we1") )
745 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
746 write = ss.remote_slot_testv_and_readv_and_writev
747 answer = write("si1", secrets,
748 {0: ([], [(0,data)], None)},
750 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
752 self.failUnlessEqual(read("si1", [0], [(0,20)]),
753 {0: ["00000000001111111111"]})
754 self.failUnlessEqual(read("si1", [0], [(95,10)]),
756 #self.failUnlessEqual(s0.remote_get_length(), 100)
758 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
759 f = self.failUnlessRaises(BadWriteEnablerError,
760 write, "si1", bad_secrets,
762 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
764 # this testv should fail
765 answer = write("si1", secrets,
766 {0: ([(0, 12, "eq", "444444444444"),
767 (20, 5, "eq", "22222"),
774 self.failUnlessEqual(answer, (False,
775 {0: ["000000000011", "22222"],
779 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
782 answer = write("si1", secrets,
783 {0: ([(10, 5, "lt", "11111"),
790 self.failUnlessEqual(answer, (False,
795 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
798 def test_operators(self):
799 # test operators, the data we're comparing is '11111' in all cases.
800 # test both fail+pass, reset data after each one.
801 ss = self.create("test_operators")
803 secrets = ( self.write_enabler("we1"),
804 self.renew_secret("we1"),
805 self.cancel_secret("we1") )
806 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
807 write = ss.remote_slot_testv_and_readv_and_writev
808 read = ss.remote_slot_readv
811 write("si1", secrets,
812 {0: ([], [(0,data)], None)},
818 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
823 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
825 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
828 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
833 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
834 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
837 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
842 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
843 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
847 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
852 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
853 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
856 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
861 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
862 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
865 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
870 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
871 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
875 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
880 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
881 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
884 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
889 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
890 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
894 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
899 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
900 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
903 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
908 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
909 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
913 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
918 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
919 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
922 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
927 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
928 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
931 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
936 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
937 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
941 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
946 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
947 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
950 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
955 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
956 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
959 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
964 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
965 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
968 # finally, test some operators against empty shares
969 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
974 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
978 def test_readv(self):
979 ss = self.create("test_readv")
980 secrets = ( self.write_enabler("we1"),
981 self.renew_secret("we1"),
982 self.cancel_secret("we1") )
983 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
984 write = ss.remote_slot_testv_and_readv_and_writev
985 read = ss.remote_slot_readv
986 data = [("%d" % i) * 100 for i in range(3)]
987 rc = write("si1", secrets,
988 {0: ([], [(0,data[0])], None),
989 1: ([], [(0,data[1])], None),
990 2: ([], [(0,data[2])], None),
992 self.failUnlessEqual(rc, (True, {}))
994 answer = read("si1", [], [(0, 10)])
995 self.failUnlessEqual(answer, {0: ["0"*10],
999 def compare_leases_without_timestamps(self, leases_a, leases_b):
1000 self.failUnlessEqual(len(leases_a), len(leases_b))
1001 for i in range(len(leases_a)):
1002 num_a, a = leases_a[i]
1003 num_b, b = leases_b[i]
1004 self.failUnlessEqual(num_a, num_b)
1005 self.failUnlessEqual(a.owner_num, b.owner_num)
1006 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1007 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1008 self.failUnlessEqual(a.nodeid, b.nodeid)
1010 def compare_leases(self, leases_a, leases_b):
1011 self.failUnlessEqual(len(leases_a), len(leases_b))
1012 for i in range(len(leases_a)):
1013 num_a, a = leases_a[i]
1014 num_b, b = leases_b[i]
1015 self.failUnlessEqual(num_a, num_b)
1016 self.failUnlessEqual(a.owner_num, b.owner_num)
1017 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1018 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1019 self.failUnlessEqual(a.nodeid, b.nodeid)
1020 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1022 def test_leases(self):
1023 ss = self.create("test_leases")
1025 return ( self.write_enabler("we1"),
1026 self.renew_secret("we1-%d" % n),
1027 self.cancel_secret("we1-%d" % n) )
1028 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1029 write = ss.remote_slot_testv_and_readv_and_writev
1030 read = ss.remote_slot_readv
1031 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1032 self.failUnlessEqual(rc, (True, {}))
1034 # create a random non-numeric file in the bucket directory, to
1035 # exercise the code that's supposed to ignore those.
1036 bucket_dir = os.path.join(self.workdir("test_leases"),
1037 "shares", storage_index_to_dir("si1"))
1038 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1039 f.write("you ought to be ignoring me\n")
1042 # re-allocate the slots and use the same secrets, that should update
1044 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1047 ss.remote_renew_lease("si1", secrets(0)[1])
1049 # now allocate them with a bunch of different secrets, to trigger the
1050 # extended lease code
1051 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1052 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
1053 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1054 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1055 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1057 # cancel one of them
1058 ss.remote_cancel_lease("si1", secrets(5)[2])
1060 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1061 all_leases = s0.debug_get_leases()
1062 self.failUnlessEqual(len(all_leases), 5)
1064 # and write enough data to expand the container, forcing the server
1065 # to move the leases
1066 write("si1", secrets(0),
1067 {0: ([], [(0,data)], 200), },
1070 # read back the leases, make sure they're still intact.
1071 self.compare_leases_without_timestamps(all_leases,
1072 s0.debug_get_leases())
1074 ss.remote_renew_lease("si1", secrets(0)[1])
1075 ss.remote_renew_lease("si1", secrets(1)[1])
1076 ss.remote_renew_lease("si1", secrets(2)[1])
1077 ss.remote_renew_lease("si1", secrets(3)[1])
1078 ss.remote_renew_lease("si1", secrets(4)[1])
1079 self.compare_leases_without_timestamps(all_leases,
1080 s0.debug_get_leases())
1081 # get a new copy of the leases, with the current timestamps. Reading
1082 # data and failing to renew/cancel leases should leave the timestamps
1084 all_leases = s0.debug_get_leases()
1085 # renewing with a bogus token should prompt an error message
1087 # examine the exception thus raised, make sure the old nodeid is
1088 # present, to provide for share migration
1089 e = self.failUnlessRaises(IndexError,
1090 ss.remote_renew_lease, "si1",
1093 self.failUnless("Unable to renew non-existent lease" in e_s)
1094 self.failUnless("I have leases accepted by nodeids:" in e_s)
1095 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1097 # same for cancelling
1098 self.failUnlessRaises(IndexError,
1099 ss.remote_cancel_lease, "si1",
1101 self.compare_leases(all_leases, s0.debug_get_leases())
1103 # reading shares should not modify the timestamp
1104 read("si1", [], [(0,200)])
1105 self.compare_leases(all_leases, s0.debug_get_leases())
1107 write("si1", secrets(0),
1108 {0: ([], [(200, "make me bigger")], None)}, [])
1109 self.compare_leases_without_timestamps(all_leases,
1110 s0.debug_get_leases())
1112 write("si1", secrets(0),
1113 {0: ([], [(500, "make me really bigger")], None)}, [])
1114 self.compare_leases_without_timestamps(all_leases,
1115 s0.debug_get_leases())
1117 # now cancel them all
1118 ss.remote_cancel_lease("si1", secrets(0)[2])
1119 ss.remote_cancel_lease("si1", secrets(1)[2])
1120 ss.remote_cancel_lease("si1", secrets(2)[2])
1121 ss.remote_cancel_lease("si1", secrets(3)[2])
1123 # the slot should still be there
1124 remaining_shares = read("si1", [], [(0,10)])
1125 self.failUnlessEqual(len(remaining_shares), 1)
1126 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1128 # cancelling a non-existent lease should raise an IndexError
1129 self.failUnlessRaises(IndexError,
1130 ss.remote_cancel_lease, "si1", "nonsecret")
1132 # and the slot should still be there
1133 remaining_shares = read("si1", [], [(0,10)])
1134 self.failUnlessEqual(len(remaining_shares), 1)
1135 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1137 ss.remote_cancel_lease("si1", secrets(4)[2])
1138 # now the slot should be gone
1139 no_shares = read("si1", [], [(0,10)])
1140 self.failUnlessEqual(no_shares, {})
1142 # cancelling a lease on a non-existent share should raise an IndexError
1143 self.failUnlessRaises(IndexError,
1144 ss.remote_cancel_lease, "si2", "nonsecret")
1147 class Stats(unittest.TestCase):
1150 self.sparent = LoggingServiceParent()
1151 self._lease_secret = itertools.count()
1153 return self.sparent.stopService()
1155 def workdir(self, name):
1156 basedir = os.path.join("storage", "Server", name)
1159 def create(self, name):
1160 workdir = self.workdir(name)
1161 ss = StorageServer(workdir)
1162 ss.setNodeID("\x00" * 20)
1163 ss.setServiceParent(self.sparent)
1166 def test_latencies(self):
1167 ss = self.create("test_latencies")
1168 for i in range(10000):
1169 ss.add_latency("allocate", 1.0 * i)
1170 for i in range(1000):
1171 ss.add_latency("renew", 1.0 * i)
1173 ss.add_latency("cancel", 2.0 * i)
1174 ss.add_latency("get", 5.0)
1176 output = ss.get_latencies()
1178 self.failUnlessEqual(sorted(output.keys()),
1179 sorted(["allocate", "renew", "cancel", "get"]))
1180 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1181 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1182 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1183 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1184 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1185 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1186 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1187 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1188 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1190 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1191 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1192 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1193 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1194 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1195 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1196 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1197 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1198 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1200 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1201 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1202 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1203 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1204 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1205 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1206 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1207 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1208 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1210 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1211 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1212 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1213 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1214 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1215 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1216 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1217 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1218 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)