2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil, base32
9 from allmydata.storage import BucketWriter, BucketReader, \
10 StorageServer, MutableShareFile, \
11 storage_index_to_dir, DataTooLargeError, LeaseInfo
12 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
14 from allmydata.interfaces import BadWriteEnablerError
15 from allmydata.test.common import LoggingServiceParent
20 def __init__(self, ignore_disconnectors=False):
21 self.ignore = ignore_disconnectors
22 self.disconnectors = {}
23 def notifyOnDisconnect(self, f, *args, **kwargs):
27 self.disconnectors[m] = (f, args, kwargs)
29 def dontNotifyOnDisconnect(self, marker):
32 del self.disconnectors[marker]
34 class FakeStatsProvider:
35 def count(self, name, delta=1):
37 def register_producer(self, producer):
40 class Bucket(unittest.TestCase):
41 def make_workdir(self, name):
42 basedir = os.path.join("storage", "Bucket", name)
43 incoming = os.path.join(basedir, "tmp", "bucket")
44 final = os.path.join(basedir, "bucket")
45 fileutil.make_dirs(basedir)
46 fileutil.make_dirs(os.path.join(basedir, "tmp"))
47 return incoming, final
49 def bucket_writer_closed(self, bw, consumed):
51 def add_latency(self, category, latency):
53 def count(self, name, delta=1):
58 renew_secret = os.urandom(32)
59 cancel_secret = os.urandom(32)
60 expiration_time = time.time() + 5000
61 return LeaseInfo(owner_num, renew_secret, cancel_secret,
62 expiration_time, "\x00" * 20)
64 def test_create(self):
65 incoming, final = self.make_workdir("test_create")
66 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
68 bw.remote_write(0, "a"*25)
69 bw.remote_write(25, "b"*25)
70 bw.remote_write(50, "c"*25)
71 bw.remote_write(75, "d"*7)
74 def test_readwrite(self):
75 incoming, final = self.make_workdir("test_readwrite")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*7) # last block may be short
84 br = BucketReader(self, bw.finalhome)
85 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
86 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
87 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
91 def callRemote(self, methname, *args, **kwargs):
93 meth = getattr(self.target, "remote_" + methname)
94 return meth(*args, **kwargs)
95 return defer.maybeDeferred(_call)
97 class BucketProxy(unittest.TestCase):
98 def make_bucket(self, name, size):
99 basedir = os.path.join("storage", "BucketProxy", name)
100 incoming = os.path.join(basedir, "tmp", "bucket")
101 final = os.path.join(basedir, "bucket")
102 fileutil.make_dirs(basedir)
103 fileutil.make_dirs(os.path.join(basedir, "tmp"))
104 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
110 def make_lease(self):
112 renew_secret = os.urandom(32)
113 cancel_secret = os.urandom(32)
114 expiration_time = time.time() + 5000
115 return LeaseInfo(owner_num, renew_secret, cancel_secret,
116 expiration_time, "\x00" * 20)
118 def bucket_writer_closed(self, bw, consumed):
120 def add_latency(self, category, latency):
122 def count(self, name, delta=1):
125 def test_create(self):
126 bw, rb, sharefname = self.make_bucket("test_create", 500)
127 bp = WriteBucketProxy(rb,
132 uri_extension_size=500, nodeid=None)
133 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
135 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
136 # Let's pretend each share has 100 bytes of data, and that there are
137 # 4 segments (25 bytes each), and 8 shares total. So the two
138 # per-segment merkle trees (crypttext_hash_tree,
139 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
140 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
141 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
142 # long. That should make the whole share:
144 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
145 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
147 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
149 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
151 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
153 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
155 uri_extension = "s" + "E"*498 + "e"
157 bw, rb, sharefname = self.make_bucket(name, sharesize)
163 uri_extension_size=len(uri_extension),
167 d.addCallback(lambda res: bp.put_block(0, "a"*25))
168 d.addCallback(lambda res: bp.put_block(1, "b"*25))
169 d.addCallback(lambda res: bp.put_block(2, "c"*25))
170 d.addCallback(lambda res: bp.put_block(3, "d"*20))
171 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
172 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
173 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
174 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
175 d.addCallback(lambda res: bp.close())
177 # now read everything back
178 def _start_reading(res):
179 br = BucketReader(self, sharefname)
182 rbp = rbp_class(rb, peerid="abc", storage_index="")
183 self.failUnless("to peer" in repr(rbp))
184 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
186 d1 = rbp.startIfNecessary()
187 d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
188 d1.addCallback(lambda res: rbp.get_block(0))
189 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
190 d1.addCallback(lambda res: rbp.get_block(1))
191 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
192 d1.addCallback(lambda res: rbp.get_block(2))
193 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
194 d1.addCallback(lambda res: rbp.get_block(3))
195 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
197 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
198 d1.addCallback(lambda res:
199 self.failUnlessEqual(res, crypttext_hashes))
200 d1.addCallback(lambda res: rbp.get_block_hashes())
201 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
202 d1.addCallback(lambda res: rbp.get_share_hashes())
203 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
204 d1.addCallback(lambda res: rbp.get_uri_extension())
205 d1.addCallback(lambda res:
206 self.failUnlessEqual(res, uri_extension))
210 d.addCallback(_start_reading)
214 def test_readwrite_v1(self):
215 return self._do_test_readwrite("test_readwrite_v1",
216 0x24, WriteBucketProxy, ReadBucketProxy)
218 def test_readwrite_v2(self):
219 return self._do_test_readwrite("test_readwrite_v2",
220 0x44, WriteBucketProxy_v2, ReadBucketProxy)
222 class FakeDiskStorageServer(StorageServer):
223 def stat_disk(self, d):
224 return self.DISKAVAIL
226 class Server(unittest.TestCase):
229 self.sparent = LoggingServiceParent()
230 self._lease_secret = itertools.count()
232 return self.sparent.stopService()
234 def workdir(self, name):
235 basedir = os.path.join("storage", "Server", name)
238 def create(self, name, reserved_space=0, klass=StorageServer):
239 workdir = self.workdir(name)
240 ss = klass(workdir, reserved_space=reserved_space,
241 stats_provider=FakeStatsProvider())
242 ss.setNodeID("\x00" * 20)
243 ss.setServiceParent(self.sparent)
246 def test_create(self):
247 ss = self.create("test_create")
249 def allocate(self, ss, storage_index, sharenums, size, canary=None):
250 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
251 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
253 canary = FakeCanary()
254 return ss.remote_allocate_buckets(storage_index,
255 renew_secret, cancel_secret,
256 sharenums, size, canary)
258 def test_large_share(self):
259 ss = self.create("test_large_share")
261 already,writers = self.allocate(ss, "allocate", [0,1,2], 2**32+2)
262 self.failUnlessEqual(already, set())
263 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
265 shnum, bucket = writers.items()[0]
266 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
267 bucket.remote_write(2**32, "ab")
268 bucket.remote_close()
270 readers = ss.remote_get_buckets("allocate")
271 reader = readers[shnum]
272 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
274 def test_dont_overfill_dirs(self):
276 This test asserts that if you add a second share whose storage index
277 share lots of leading bits with an extant share (but isn't the exact
278 same storage index), this won't add an entry to the share directory.
280 ss = self.create("test_dont_overfill_dirs")
281 already, writers = self.allocate(ss, "storageindex", [0], 10)
282 for i, wb in writers.items():
283 wb.remote_write(0, "%10d" % i)
285 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
287 children_of_storedir = set(os.listdir(storedir))
289 # Now store another one under another storageindex that has leading
290 # chars the same as the first storageindex.
291 already, writers = self.allocate(ss, "storageindey", [0], 10)
292 for i, wb in writers.items():
293 wb.remote_write(0, "%10d" % i)
295 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
297 new_children_of_storedir = set(os.listdir(storedir))
298 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
300 def test_remove_incoming(self):
301 ss = self.create("test_remove_incoming")
302 already, writers = self.allocate(ss, "vid", range(3), 10)
303 for i,wb in writers.items():
304 wb.remote_write(0, "%10d" % i)
306 incoming_share_dir = wb.incominghome
307 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
308 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
309 incoming_dir = os.path.dirname(incoming_prefix_dir)
310 self.failIf(os.path.exists(incoming_bucket_dir))
311 self.failIf(os.path.exists(incoming_prefix_dir))
312 self.failUnless(os.path.exists(incoming_dir))
314 def test_allocate(self):
315 ss = self.create("test_allocate")
317 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
319 canary = FakeCanary()
320 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
321 self.failUnlessEqual(already, set())
322 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
324 # while the buckets are open, they should not count as readable
325 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
328 for i,wb in writers.items():
329 wb.remote_write(0, "%25d" % i)
331 # aborting a bucket that was already closed is a no-op
334 # now they should be readable
335 b = ss.remote_get_buckets("allocate")
336 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
337 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
339 # now if we ask about writing again, the server should offer those
340 # three buckets as already present. It should offer them even if we
341 # don't ask about those specific ones.
342 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
343 self.failUnlessEqual(already, set([0,1,2]))
344 self.failUnlessEqual(set(writers.keys()), set([3,4]))
346 # while those two buckets are open for writing, the server should
347 # refuse to offer them to uploaders
349 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
350 self.failUnlessEqual(already2, set([0,1,2]))
351 self.failUnlessEqual(set(writers2.keys()), set([5]))
353 # aborting the writes should remove the tempfiles
354 for i,wb in writers2.items():
356 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
357 self.failUnlessEqual(already2, set([0,1,2]))
358 self.failUnlessEqual(set(writers2.keys()), set([5]))
360 for i,wb in writers2.items():
362 for i,wb in writers.items():
365 def test_disconnect(self):
366 # simulate a disconnection
367 ss = self.create("test_disconnect")
368 canary = FakeCanary()
369 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
370 self.failUnlessEqual(already, set())
371 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
372 for (f,args,kwargs) in canary.disconnectors.values():
377 # that ought to delete the incoming shares
378 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
379 self.failUnlessEqual(already, set())
380 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
382 def test_reserved_space(self):
383 ss = self.create("test_reserved_space", reserved_space=10000,
384 klass=FakeDiskStorageServer)
385 # the FakeDiskStorageServer doesn't do real statvfs() calls
387 # 15k available, 10k reserved, leaves 5k for shares
389 # a newly created and filled share incurs this much overhead, beyond
390 # the size we request.
392 LEASE_SIZE = 4+32+32+4
393 canary = FakeCanary(True)
394 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
395 self.failUnlessEqual(len(writers), 3)
396 # now the StorageServer should have 3000 bytes provisionally
397 # allocated, allowing only 2000 more to be claimed
398 self.failUnlessEqual(len(ss._active_writers), 3)
400 # allocating 1001-byte shares only leaves room for one
401 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
402 self.failUnlessEqual(len(writers2), 1)
403 self.failUnlessEqual(len(ss._active_writers), 4)
405 # we abandon the first set, so their provisional allocation should be
409 self.failUnlessEqual(len(ss._active_writers), 1)
410 # now we have a provisional allocation of 1001 bytes
412 # and we close the second set, so their provisional allocation should
413 # become real, long-term allocation, and grows to include the
415 for bw in writers2.values():
416 bw.remote_write(0, "a"*25)
421 self.failUnlessEqual(len(ss._active_writers), 0)
423 allocated = 1001 + OVERHEAD + LEASE_SIZE
425 # we have to manually increase DISKAVAIL, since we're not doing real
427 ss.DISKAVAIL -= allocated
429 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
430 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
431 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
432 self.failUnlessEqual(len(writers3), 39)
433 self.failUnlessEqual(len(ss._active_writers), 39)
437 self.failUnlessEqual(len(ss._active_writers), 0)
438 ss.disownServiceParent()
442 basedir = self.workdir("test_seek_behavior")
443 fileutil.make_dirs(basedir)
444 filename = os.path.join(basedir, "testfile")
445 f = open(filename, "wb")
448 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
449 # files. mode="a" preserves previous contents but does not allow
450 # seeking-to-create-holes. mode="r+" allows both.
451 f = open(filename, "rb+")
455 filelen = os.stat(filename)[stat.ST_SIZE]
456 self.failUnlessEqual(filelen, 100+3)
457 f2 = open(filename, "rb")
458 self.failUnlessEqual(f2.read(5), "start")
461 def test_leases(self):
462 ss = self.create("test_leases")
463 canary = FakeCanary()
467 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
468 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
469 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
470 sharenums, size, canary)
471 self.failUnlessEqual(len(already), 0)
472 self.failUnlessEqual(len(writers), 5)
473 for wb in writers.values():
476 leases = list(ss.get_leases("si0"))
477 self.failUnlessEqual(len(leases), 1)
478 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
480 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
481 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
482 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
483 sharenums, size, canary)
484 for wb in writers.values():
487 # take out a second lease on si1
488 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
489 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
490 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
491 sharenums, size, canary)
492 self.failUnlessEqual(len(already), 5)
493 self.failUnlessEqual(len(writers), 0)
495 leases = list(ss.get_leases("si1"))
496 self.failUnlessEqual(len(leases), 2)
497 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
499 # check that si0 is readable
500 readers = ss.remote_get_buckets("si0")
501 self.failUnlessEqual(len(readers), 5)
503 # renew the first lease. Only the proper renew_secret should work
504 ss.remote_renew_lease("si0", rs0)
505 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
506 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
508 # check that si0 is still readable
509 readers = ss.remote_get_buckets("si0")
510 self.failUnlessEqual(len(readers), 5)
513 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
514 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
515 ss.remote_cancel_lease("si0", cs0)
517 # si0 should now be gone
518 readers = ss.remote_get_buckets("si0")
519 self.failUnlessEqual(len(readers), 0)
520 # and the renew should no longer work
521 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
524 # cancel the first lease on si1, leaving the second in place
525 ss.remote_cancel_lease("si1", cs1)
526 readers = ss.remote_get_buckets("si1")
527 self.failUnlessEqual(len(readers), 5)
528 # the corresponding renew should no longer work
529 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
531 leases = list(ss.get_leases("si1"))
532 self.failUnlessEqual(len(leases), 1)
533 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
535 ss.remote_renew_lease("si1", rs2)
536 # cancelling the second should make it go away
537 ss.remote_cancel_lease("si1", cs2)
538 readers = ss.remote_get_buckets("si1")
539 self.failUnlessEqual(len(readers), 0)
540 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
541 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
543 leases = list(ss.get_leases("si1"))
544 self.failUnlessEqual(len(leases), 0)
547 # test overlapping uploads
548 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
549 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
550 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
551 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
552 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
553 sharenums, size, canary)
554 self.failUnlessEqual(len(already), 0)
555 self.failUnlessEqual(len(writers), 5)
556 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
557 sharenums, size, canary)
558 self.failUnlessEqual(len(already2), 0)
559 self.failUnlessEqual(len(writers2), 0)
560 for wb in writers.values():
563 leases = list(ss.get_leases("si3"))
564 self.failUnlessEqual(len(leases), 1)
566 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
567 sharenums, size, canary)
568 self.failUnlessEqual(len(already3), 5)
569 self.failUnlessEqual(len(writers3), 0)
571 leases = list(ss.get_leases("si3"))
572 self.failUnlessEqual(len(leases), 2)
574 def test_readonly(self):
575 workdir = self.workdir("test_readonly")
576 ss = StorageServer(workdir, readonly_storage=True)
577 ss.setNodeID("\x00" * 20)
578 ss.setServiceParent(self.sparent)
580 canary = FakeCanary()
581 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
582 self.failUnlessEqual(already, set())
583 self.failUnlessEqual(writers, {})
585 stats = ss.get_stats()
586 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
588 if "storage_server.disk_avail" in stats:
589 # windows does not have os.statvfs, so it doesn't give us disk
590 # stats. But if there are stats, readonly_storage means
592 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
594 def test_discard(self):
595 # discard is really only used for other tests, but we test it anyways
596 workdir = self.workdir("test_discard")
597 ss = StorageServer(workdir, discard_storage=True)
598 ss.setNodeID("\x00" * 20)
599 ss.setServiceParent(self.sparent)
601 canary = FakeCanary()
602 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
603 self.failUnlessEqual(already, set())
604 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
605 for i,wb in writers.items():
606 wb.remote_write(0, "%25d" % i)
608 # since we discard the data, the shares should be present but sparse.
609 # Since we write with some seeks, the data we read back will be all
611 b = ss.remote_get_buckets("vid")
612 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
613 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
615 def test_advise_corruption(self):
616 workdir = self.workdir("test_advise_corruption")
617 ss = StorageServer(workdir, discard_storage=True)
618 ss.setNodeID("\x00" * 20)
619 ss.setServiceParent(self.sparent)
621 si0_s = base32.b2a("si0")
622 ss.remote_advise_corrupt_share("immutable", "si0", 0,
623 "This share smells funny.\n")
624 reportdir = os.path.join(workdir, "corruption-advisories")
625 reports = os.listdir(reportdir)
626 self.failUnlessEqual(len(reports), 1)
627 report_si0 = reports[0]
628 self.failUnless(si0_s in report_si0, report_si0)
629 f = open(os.path.join(reportdir, report_si0), "r")
632 self.failUnless("type: immutable" in report)
633 self.failUnless(("storage_index: %s" % si0_s) in report)
634 self.failUnless("share_number: 0" in report)
635 self.failUnless("This share smells funny." in report)
637 # test the RIBucketWriter version too
638 si1_s = base32.b2a("si1")
639 already,writers = self.allocate(ss, "si1", [1], 75)
640 self.failUnlessEqual(already, set())
641 self.failUnlessEqual(set(writers.keys()), set([1]))
642 writers[1].remote_write(0, "data")
643 writers[1].remote_close()
645 b = ss.remote_get_buckets("si1")
646 self.failUnlessEqual(set(b.keys()), set([1]))
647 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
649 reports = os.listdir(reportdir)
650 self.failUnlessEqual(len(reports), 2)
651 report_si1 = [r for r in reports if si1_s in r][0]
652 f = open(os.path.join(reportdir, report_si1), "r")
655 self.failUnless("type: immutable" in report)
656 self.failUnless(("storage_index: %s" % si1_s) in report)
657 self.failUnless("share_number: 1" in report)
658 self.failUnless("This share tastes like dust." in report)
662 class MutableServer(unittest.TestCase):
665 self.sparent = LoggingServiceParent()
666 self._lease_secret = itertools.count()
668 return self.sparent.stopService()
670 def workdir(self, name):
671 basedir = os.path.join("storage", "MutableServer", name)
674 def create(self, name):
675 workdir = self.workdir(name)
676 ss = StorageServer(workdir)
677 ss.setServiceParent(self.sparent)
678 ss.setNodeID("\x00" * 20)
681 def test_create(self):
682 ss = self.create("test_create")
684 def write_enabler(self, we_tag):
685 return hashutil.tagged_hash("we_blah", we_tag)
687 def renew_secret(self, tag):
688 return hashutil.tagged_hash("renew_blah", str(tag))
690 def cancel_secret(self, tag):
691 return hashutil.tagged_hash("cancel_blah", str(tag))
693 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
694 write_enabler = self.write_enabler(we_tag)
695 renew_secret = self.renew_secret(lease_tag)
696 cancel_secret = self.cancel_secret(lease_tag)
697 rstaraw = ss.remote_slot_testv_and_readv_and_writev
698 testandwritev = dict( [ (shnum, ([], [], None) )
699 for shnum in sharenums ] )
701 rc = rstaraw(storage_index,
702 (write_enabler, renew_secret, cancel_secret),
705 (did_write, readv_data) = rc
706 self.failUnless(did_write)
707 self.failUnless(isinstance(readv_data, dict))
708 self.failUnlessEqual(len(readv_data), 0)
710 def test_container_size(self):
711 ss = self.create("test_container_size")
712 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
714 rstaraw = ss.remote_slot_testv_and_readv_and_writev
715 secrets = ( self.write_enabler("we1"),
716 self.renew_secret("we1"),
717 self.cancel_secret("we1") )
718 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
719 answer = rstaraw("si1", secrets,
720 {0: ([], [(0,data)], len(data)+12)},
722 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
724 # trying to make the container too large will raise an exception
725 TOOBIG = MutableShareFile.MAX_SIZE + 10
726 self.failUnlessRaises(DataTooLargeError,
727 rstaraw, "si1", secrets,
728 {0: ([], [(0,data)], TOOBIG)},
731 # it should be possible to make the container smaller, although at
732 # the moment this doesn't actually affect the share
733 answer = rstaraw("si1", secrets,
734 {0: ([], [(0,data)], len(data)+8)},
736 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
738 def test_allocate(self):
739 ss = self.create("test_allocate")
740 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
743 read = ss.remote_slot_readv
744 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
746 self.failUnlessEqual(read("si1", [], [(0, 10)]),
747 {0: [""], 1: [""], 2: [""]})
748 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
752 secrets = ( self.write_enabler("we1"),
753 self.renew_secret("we1"),
754 self.cancel_secret("we1") )
755 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
756 write = ss.remote_slot_testv_and_readv_and_writev
757 answer = write("si1", secrets,
758 {0: ([], [(0,data)], None)},
760 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
762 self.failUnlessEqual(read("si1", [0], [(0,20)]),
763 {0: ["00000000001111111111"]})
764 self.failUnlessEqual(read("si1", [0], [(95,10)]),
766 #self.failUnlessEqual(s0.remote_get_length(), 100)
768 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
769 f = self.failUnlessRaises(BadWriteEnablerError,
770 write, "si1", bad_secrets,
772 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
774 # this testv should fail
775 answer = write("si1", secrets,
776 {0: ([(0, 12, "eq", "444444444444"),
777 (20, 5, "eq", "22222"),
784 self.failUnlessEqual(answer, (False,
785 {0: ["000000000011", "22222"],
789 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
792 answer = write("si1", secrets,
793 {0: ([(10, 5, "lt", "11111"),
800 self.failUnlessEqual(answer, (False,
805 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
808 def test_operators(self):
809 # test operators, the data we're comparing is '11111' in all cases.
810 # test both fail+pass, reset data after each one.
811 ss = self.create("test_operators")
813 secrets = ( self.write_enabler("we1"),
814 self.renew_secret("we1"),
815 self.cancel_secret("we1") )
816 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
817 write = ss.remote_slot_testv_and_readv_and_writev
818 read = ss.remote_slot_readv
821 write("si1", secrets,
822 {0: ([], [(0,data)], None)},
828 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
833 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
834 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
835 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
838 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
843 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
844 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
847 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
852 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
853 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
857 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
862 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
863 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
866 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
871 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
872 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
875 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
880 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
881 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
885 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
890 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
891 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
894 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
899 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
900 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
904 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
909 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
910 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
913 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
918 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
919 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
923 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
928 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
929 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
932 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
937 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
938 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
941 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
946 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
947 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
951 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
956 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
957 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
960 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
965 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
966 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
969 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
974 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
978 # finally, test some operators against empty shares
979 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
984 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
985 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
988 def test_readv(self):
989 ss = self.create("test_readv")
990 secrets = ( self.write_enabler("we1"),
991 self.renew_secret("we1"),
992 self.cancel_secret("we1") )
993 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
994 write = ss.remote_slot_testv_and_readv_and_writev
995 read = ss.remote_slot_readv
996 data = [("%d" % i) * 100 for i in range(3)]
997 rc = write("si1", secrets,
998 {0: ([], [(0,data[0])], None),
999 1: ([], [(0,data[1])], None),
1000 2: ([], [(0,data[2])], None),
1002 self.failUnlessEqual(rc, (True, {}))
1004 answer = read("si1", [], [(0, 10)])
1005 self.failUnlessEqual(answer, {0: ["0"*10],
1009 def compare_leases_without_timestamps(self, leases_a, leases_b):
1010 self.failUnlessEqual(len(leases_a), len(leases_b))
1011 for i in range(len(leases_a)):
1012 num_a, a = leases_a[i]
1013 num_b, b = leases_b[i]
1014 self.failUnlessEqual(num_a, num_b)
1015 self.failUnlessEqual(a.owner_num, b.owner_num)
1016 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1017 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1018 self.failUnlessEqual(a.nodeid, b.nodeid)
1020 def compare_leases(self, leases_a, leases_b):
1021 self.failUnlessEqual(len(leases_a), len(leases_b))
1022 for i in range(len(leases_a)):
1023 num_a, a = leases_a[i]
1024 num_b, b = leases_b[i]
1025 self.failUnlessEqual(num_a, num_b)
1026 self.failUnlessEqual(a.owner_num, b.owner_num)
1027 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1028 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1029 self.failUnlessEqual(a.nodeid, b.nodeid)
1030 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1032 def test_leases(self):
1033 ss = self.create("test_leases")
1035 return ( self.write_enabler("we1"),
1036 self.renew_secret("we1-%d" % n),
1037 self.cancel_secret("we1-%d" % n) )
1038 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1039 write = ss.remote_slot_testv_and_readv_and_writev
1040 read = ss.remote_slot_readv
1041 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1042 self.failUnlessEqual(rc, (True, {}))
1044 # create a random non-numeric file in the bucket directory, to
1045 # exercise the code that's supposed to ignore those.
1046 bucket_dir = os.path.join(self.workdir("test_leases"),
1047 "shares", storage_index_to_dir("si1"))
1048 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1049 f.write("you ought to be ignoring me\n")
1052 # re-allocate the slots and use the same secrets, that should update
1054 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1057 ss.remote_renew_lease("si1", secrets(0)[1])
1059 # now allocate them with a bunch of different secrets, to trigger the
1060 # extended lease code
1061 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1062 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
1063 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1064 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1065 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1067 # cancel one of them
1068 ss.remote_cancel_lease("si1", secrets(5)[2])
1070 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1071 all_leases = s0.debug_get_leases()
1072 self.failUnlessEqual(len(all_leases), 5)
1074 # and write enough data to expand the container, forcing the server
1075 # to move the leases
1076 write("si1", secrets(0),
1077 {0: ([], [(0,data)], 200), },
1080 # read back the leases, make sure they're still intact.
1081 self.compare_leases_without_timestamps(all_leases,
1082 s0.debug_get_leases())
1084 ss.remote_renew_lease("si1", secrets(0)[1])
1085 ss.remote_renew_lease("si1", secrets(1)[1])
1086 ss.remote_renew_lease("si1", secrets(2)[1])
1087 ss.remote_renew_lease("si1", secrets(3)[1])
1088 ss.remote_renew_lease("si1", secrets(4)[1])
1089 self.compare_leases_without_timestamps(all_leases,
1090 s0.debug_get_leases())
1091 # get a new copy of the leases, with the current timestamps. Reading
1092 # data and failing to renew/cancel leases should leave the timestamps
1094 all_leases = s0.debug_get_leases()
1095 # renewing with a bogus token should prompt an error message
1097 # examine the exception thus raised, make sure the old nodeid is
1098 # present, to provide for share migration
1099 e = self.failUnlessRaises(IndexError,
1100 ss.remote_renew_lease, "si1",
1103 self.failUnless("Unable to renew non-existent lease" in e_s)
1104 self.failUnless("I have leases accepted by nodeids:" in e_s)
1105 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1107 # same for cancelling
1108 self.failUnlessRaises(IndexError,
1109 ss.remote_cancel_lease, "si1",
1111 self.compare_leases(all_leases, s0.debug_get_leases())
1113 # reading shares should not modify the timestamp
1114 read("si1", [], [(0,200)])
1115 self.compare_leases(all_leases, s0.debug_get_leases())
1117 write("si1", secrets(0),
1118 {0: ([], [(200, "make me bigger")], None)}, [])
1119 self.compare_leases_without_timestamps(all_leases,
1120 s0.debug_get_leases())
1122 write("si1", secrets(0),
1123 {0: ([], [(500, "make me really bigger")], None)}, [])
1124 self.compare_leases_without_timestamps(all_leases,
1125 s0.debug_get_leases())
1127 # now cancel them all
1128 ss.remote_cancel_lease("si1", secrets(0)[2])
1129 ss.remote_cancel_lease("si1", secrets(1)[2])
1130 ss.remote_cancel_lease("si1", secrets(2)[2])
1131 ss.remote_cancel_lease("si1", secrets(3)[2])
1133 # the slot should still be there
1134 remaining_shares = read("si1", [], [(0,10)])
1135 self.failUnlessEqual(len(remaining_shares), 1)
1136 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1138 # cancelling a non-existent lease should raise an IndexError
1139 self.failUnlessRaises(IndexError,
1140 ss.remote_cancel_lease, "si1", "nonsecret")
1142 # and the slot should still be there
1143 remaining_shares = read("si1", [], [(0,10)])
1144 self.failUnlessEqual(len(remaining_shares), 1)
1145 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1147 ss.remote_cancel_lease("si1", secrets(4)[2])
1148 # now the slot should be gone
1149 no_shares = read("si1", [], [(0,10)])
1150 self.failUnlessEqual(no_shares, {})
1152 # cancelling a lease on a non-existent share should raise an IndexError
1153 self.failUnlessRaises(IndexError,
1154 ss.remote_cancel_lease, "si2", "nonsecret")
1157 class Stats(unittest.TestCase):
1160 self.sparent = LoggingServiceParent()
1161 self._lease_secret = itertools.count()
1163 return self.sparent.stopService()
1165 def workdir(self, name):
1166 basedir = os.path.join("storage", "Server", name)
1169 def create(self, name):
1170 workdir = self.workdir(name)
1171 ss = StorageServer(workdir)
1172 ss.setNodeID("\x00" * 20)
1173 ss.setServiceParent(self.sparent)
1176 def test_latencies(self):
1177 ss = self.create("test_latencies")
1178 for i in range(10000):
1179 ss.add_latency("allocate", 1.0 * i)
1180 for i in range(1000):
1181 ss.add_latency("renew", 1.0 * i)
1183 ss.add_latency("cancel", 2.0 * i)
1184 ss.add_latency("get", 5.0)
1186 output = ss.get_latencies()
1188 self.failUnlessEqual(sorted(output.keys()),
1189 sorted(["allocate", "renew", "cancel", "get"]))
1190 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1191 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1192 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1193 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1194 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1195 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1196 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1197 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1198 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1200 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1201 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1202 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1203 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1204 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1205 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1206 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1207 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1208 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1210 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1211 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1212 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1213 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1214 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1215 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1216 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1217 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1218 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1220 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1221 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1222 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1223 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1224 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1225 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1226 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1227 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1228 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)