1 from twisted.trial import unittest
3 from twisted.internet import defer
4 import time, os.path, stat
6 from allmydata import interfaces
7 from allmydata.util import fileutil, hashutil, base32
8 from allmydata.storage import BucketWriter, BucketReader, \
9 StorageServer, MutableShareFile, \
10 storage_index_to_dir, DataTooLargeError, LeaseInfo
11 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
13 from allmydata.interfaces import BadWriteEnablerError
14 from allmydata.test.common import LoggingServiceParent
19 def __init__(self, ignore_disconnectors=False):
20 self.ignore = ignore_disconnectors
21 self.disconnectors = {}
22 def notifyOnDisconnect(self, f, *args, **kwargs):
26 self.disconnectors[m] = (f, args, kwargs)
28 def dontNotifyOnDisconnect(self, marker):
31 del self.disconnectors[marker]
33 class FakeStatsProvider:
34 def count(self, name, delta=1):
36 def register_producer(self, producer):
39 class Bucket(unittest.TestCase):
40 def make_workdir(self, name):
41 basedir = os.path.join("storage", "Bucket", name)
42 incoming = os.path.join(basedir, "tmp", "bucket")
43 final = os.path.join(basedir, "bucket")
44 fileutil.make_dirs(basedir)
45 fileutil.make_dirs(os.path.join(basedir, "tmp"))
46 return incoming, final
48 def bucket_writer_closed(self, bw, consumed):
50 def add_latency(self, category, latency):
52 def count(self, name, delta=1):
57 renew_secret = os.urandom(32)
58 cancel_secret = os.urandom(32)
59 expiration_time = time.time() + 5000
60 return LeaseInfo(owner_num, renew_secret, cancel_secret,
61 expiration_time, "\x00" * 20)
63 def test_create(self):
64 incoming, final = self.make_workdir("test_create")
65 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
67 bw.remote_write(0, "a"*25)
68 bw.remote_write(25, "b"*25)
69 bw.remote_write(50, "c"*25)
70 bw.remote_write(75, "d"*7)
73 def test_readwrite(self):
74 incoming, final = self.make_workdir("test_readwrite")
75 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77 bw.remote_write(0, "a"*25)
78 bw.remote_write(25, "b"*25)
79 bw.remote_write(50, "c"*7) # last block may be short
83 br = BucketReader(self, bw.finalhome)
84 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
85 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
86 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
90 def callRemote(self, methname, *args, **kwargs):
92 meth = getattr(self.target, "remote_" + methname)
93 return meth(*args, **kwargs)
94 return defer.maybeDeferred(_call)
96 class BucketProxy(unittest.TestCase):
97 def make_bucket(self, name, size):
98 basedir = os.path.join("storage", "BucketProxy", name)
99 incoming = os.path.join(basedir, "tmp", "bucket")
100 final = os.path.join(basedir, "bucket")
101 fileutil.make_dirs(basedir)
102 fileutil.make_dirs(os.path.join(basedir, "tmp"))
103 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
109 def make_lease(self):
111 renew_secret = os.urandom(32)
112 cancel_secret = os.urandom(32)
113 expiration_time = time.time() + 5000
114 return LeaseInfo(owner_num, renew_secret, cancel_secret,
115 expiration_time, "\x00" * 20)
117 def bucket_writer_closed(self, bw, consumed):
119 def add_latency(self, category, latency):
121 def count(self, name, delta=1):
124 def test_create(self):
125 bw, rb, sharefname = self.make_bucket("test_create", 500)
126 bp = WriteBucketProxy(rb,
131 uri_extension_size_max=500, nodeid=None)
132 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
134 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
135 # Let's pretend each share has 100 bytes of data, and that there are
136 # 4 segments (25 bytes each), and 8 shares total. So the two
137 # per-segment merkle trees (crypttext_hash_tree,
138 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
139 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
140 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
141 # long. That should make the whole share:
143 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
144 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
146 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
148 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
150 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
152 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
154 uri_extension = "s" + "E"*498 + "e"
156 bw, rb, sharefname = self.make_bucket(name, sharesize)
162 uri_extension_size_max=len(uri_extension),
166 d.addCallback(lambda res: bp.put_block(0, "a"*25))
167 d.addCallback(lambda res: bp.put_block(1, "b"*25))
168 d.addCallback(lambda res: bp.put_block(2, "c"*25))
169 d.addCallback(lambda res: bp.put_block(3, "d"*20))
170 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
171 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
172 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
173 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
174 d.addCallback(lambda res: bp.close())
176 # now read everything back
177 def _start_reading(res):
178 br = BucketReader(self, sharefname)
181 rbp = rbp_class(rb, peerid="abc", storage_index="")
182 self.failUnless("to peer" in repr(rbp))
183 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
185 d1 = rbp.get_block_data(0, 25, 25)
186 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
187 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
188 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
189 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
190 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
191 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
192 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
194 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
195 d1.addCallback(lambda res:
196 self.failUnlessEqual(res, crypttext_hashes))
197 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
198 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
199 d1.addCallback(lambda res: rbp.get_share_hashes())
200 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
201 d1.addCallback(lambda res: rbp.get_uri_extension())
202 d1.addCallback(lambda res:
203 self.failUnlessEqual(res, uri_extension))
207 d.addCallback(_start_reading)
211 def test_readwrite_v1(self):
212 return self._do_test_readwrite("test_readwrite_v1",
213 0x24, WriteBucketProxy, ReadBucketProxy)
215 def test_readwrite_v2(self):
216 return self._do_test_readwrite("test_readwrite_v2",
217 0x44, WriteBucketProxy_v2, ReadBucketProxy)
219 class FakeDiskStorageServer(StorageServer):
220 def stat_disk(self, d):
221 return self.DISKAVAIL
223 class Server(unittest.TestCase):
226 self.sparent = LoggingServiceParent()
227 self._lease_secret = itertools.count()
229 return self.sparent.stopService()
231 def workdir(self, name):
232 basedir = os.path.join("storage", "Server", name)
235 def create(self, name, reserved_space=0, klass=StorageServer):
236 workdir = self.workdir(name)
237 ss = klass(workdir, reserved_space=reserved_space,
238 stats_provider=FakeStatsProvider())
239 ss.setNodeID("\x00" * 20)
240 ss.setServiceParent(self.sparent)
243 def test_create(self):
244 ss = self.create("test_create")
246 def allocate(self, ss, storage_index, sharenums, size, canary=None):
247 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
248 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
250 canary = FakeCanary()
251 return ss.remote_allocate_buckets(storage_index,
252 renew_secret, cancel_secret,
253 sharenums, size, canary)
255 def test_large_share(self):
256 ss = self.create("test_large_share")
258 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
259 self.failUnlessEqual(already, set())
260 self.failUnlessEqual(set(writers.keys()), set([0]))
262 shnum, bucket = writers.items()[0]
263 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
264 bucket.remote_write(2**32, "ab")
265 bucket.remote_close()
267 readers = ss.remote_get_buckets("allocate")
268 reader = readers[shnum]
269 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
270 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
272 def test_dont_overfill_dirs(self):
274 This test asserts that if you add a second share whose storage index
275 share lots of leading bits with an extant share (but isn't the exact
276 same storage index), this won't add an entry to the share directory.
278 ss = self.create("test_dont_overfill_dirs")
279 already, writers = self.allocate(ss, "storageindex", [0], 10)
280 for i, wb in writers.items():
281 wb.remote_write(0, "%10d" % i)
283 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
285 children_of_storedir = set(os.listdir(storedir))
287 # Now store another one under another storageindex that has leading
288 # chars the same as the first storageindex.
289 already, writers = self.allocate(ss, "storageindey", [0], 10)
290 for i, wb in writers.items():
291 wb.remote_write(0, "%10d" % i)
293 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
295 new_children_of_storedir = set(os.listdir(storedir))
296 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
298 def test_remove_incoming(self):
299 ss = self.create("test_remove_incoming")
300 already, writers = self.allocate(ss, "vid", range(3), 10)
301 for i,wb in writers.items():
302 wb.remote_write(0, "%10d" % i)
304 incoming_share_dir = wb.incominghome
305 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
306 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
307 incoming_dir = os.path.dirname(incoming_prefix_dir)
308 self.failIf(os.path.exists(incoming_bucket_dir))
309 self.failIf(os.path.exists(incoming_prefix_dir))
310 self.failUnless(os.path.exists(incoming_dir))
312 def test_allocate(self):
313 ss = self.create("test_allocate")
315 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
317 canary = FakeCanary()
318 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
319 self.failUnlessEqual(already, set())
320 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
322 # while the buckets are open, they should not count as readable
323 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
326 for i,wb in writers.items():
327 wb.remote_write(0, "%25d" % i)
329 # aborting a bucket that was already closed is a no-op
332 # now they should be readable
333 b = ss.remote_get_buckets("allocate")
334 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
335 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
337 # now if we ask about writing again, the server should offer those
338 # three buckets as already present. It should offer them even if we
339 # don't ask about those specific ones.
340 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
341 self.failUnlessEqual(already, set([0,1,2]))
342 self.failUnlessEqual(set(writers.keys()), set([3,4]))
344 # while those two buckets are open for writing, the server should
345 # refuse to offer them to uploaders
347 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
348 self.failUnlessEqual(already2, set([0,1,2]))
349 self.failUnlessEqual(set(writers2.keys()), set([5]))
351 # aborting the writes should remove the tempfiles
352 for i,wb in writers2.items():
354 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
355 self.failUnlessEqual(already2, set([0,1,2]))
356 self.failUnlessEqual(set(writers2.keys()), set([5]))
358 for i,wb in writers2.items():
360 for i,wb in writers.items():
363 def test_disconnect(self):
364 # simulate a disconnection
365 ss = self.create("test_disconnect")
366 canary = FakeCanary()
367 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
368 self.failUnlessEqual(already, set())
369 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
370 for (f,args,kwargs) in canary.disconnectors.values():
375 # that ought to delete the incoming shares
376 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
377 self.failUnlessEqual(already, set())
378 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
380 def test_reserved_space(self):
381 ss = self.create("test_reserved_space", reserved_space=10000,
382 klass=FakeDiskStorageServer)
383 # the FakeDiskStorageServer doesn't do real statvfs() calls
385 # 15k available, 10k reserved, leaves 5k for shares
387 # a newly created and filled share incurs this much overhead, beyond
388 # the size we request.
390 LEASE_SIZE = 4+32+32+4
391 canary = FakeCanary(True)
392 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
393 self.failUnlessEqual(len(writers), 3)
394 # now the StorageServer should have 3000 bytes provisionally
395 # allocated, allowing only 2000 more to be claimed
396 self.failUnlessEqual(len(ss._active_writers), 3)
398 # allocating 1001-byte shares only leaves room for one
399 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
400 self.failUnlessEqual(len(writers2), 1)
401 self.failUnlessEqual(len(ss._active_writers), 4)
403 # we abandon the first set, so their provisional allocation should be
407 self.failUnlessEqual(len(ss._active_writers), 1)
408 # now we have a provisional allocation of 1001 bytes
410 # and we close the second set, so their provisional allocation should
411 # become real, long-term allocation, and grows to include the
413 for bw in writers2.values():
414 bw.remote_write(0, "a"*25)
419 self.failUnlessEqual(len(ss._active_writers), 0)
421 allocated = 1001 + OVERHEAD + LEASE_SIZE
423 # we have to manually increase DISKAVAIL, since we're not doing real
425 ss.DISKAVAIL -= allocated
427 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
428 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
429 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
430 self.failUnlessEqual(len(writers3), 39)
431 self.failUnlessEqual(len(ss._active_writers), 39)
435 self.failUnlessEqual(len(ss._active_writers), 0)
436 ss.disownServiceParent()
440 basedir = self.workdir("test_seek_behavior")
441 fileutil.make_dirs(basedir)
442 filename = os.path.join(basedir, "testfile")
443 f = open(filename, "wb")
446 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
447 # files. mode="a" preserves previous contents but does not allow
448 # seeking-to-create-holes. mode="r+" allows both.
449 f = open(filename, "rb+")
453 filelen = os.stat(filename)[stat.ST_SIZE]
454 self.failUnlessEqual(filelen, 100+3)
455 f2 = open(filename, "rb")
456 self.failUnlessEqual(f2.read(5), "start")
459 def test_leases(self):
460 ss = self.create("test_leases")
461 canary = FakeCanary()
465 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
466 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
467 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
468 sharenums, size, canary)
469 self.failUnlessEqual(len(already), 0)
470 self.failUnlessEqual(len(writers), 5)
471 for wb in writers.values():
474 leases = list(ss.get_leases("si0"))
475 self.failUnlessEqual(len(leases), 1)
476 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
478 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
479 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
480 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
481 sharenums, size, canary)
482 for wb in writers.values():
485 # take out a second lease on si1
486 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
487 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
488 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
489 sharenums, size, canary)
490 self.failUnlessEqual(len(already), 5)
491 self.failUnlessEqual(len(writers), 0)
493 leases = list(ss.get_leases("si1"))
494 self.failUnlessEqual(len(leases), 2)
495 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
497 # and a third lease, using add-lease
498 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
499 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
500 ss.remote_add_lease("si1", rs2a, cs2a)
501 leases = list(ss.get_leases("si1"))
502 self.failUnlessEqual(len(leases), 3)
503 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
505 # add-lease on a missing storage index is silently ignored
506 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
508 # check that si0 is readable
509 readers = ss.remote_get_buckets("si0")
510 self.failUnlessEqual(len(readers), 5)
512 # renew the first lease. Only the proper renew_secret should work
513 ss.remote_renew_lease("si0", rs0)
514 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
515 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
517 # check that si0 is still readable
518 readers = ss.remote_get_buckets("si0")
519 self.failUnlessEqual(len(readers), 5)
522 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
523 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
524 ss.remote_cancel_lease("si0", cs0)
526 # si0 should now be gone
527 readers = ss.remote_get_buckets("si0")
528 self.failUnlessEqual(len(readers), 0)
529 # and the renew should no longer work
530 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
533 # cancel the first lease on si1, leaving the second and third in place
534 ss.remote_cancel_lease("si1", cs1)
535 readers = ss.remote_get_buckets("si1")
536 self.failUnlessEqual(len(readers), 5)
537 # the corresponding renew should no longer work
538 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
540 leases = list(ss.get_leases("si1"))
541 self.failUnlessEqual(len(leases), 2)
542 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
544 ss.remote_renew_lease("si1", rs2)
545 # cancelling the second and third should make it go away
546 ss.remote_cancel_lease("si1", cs2)
547 ss.remote_cancel_lease("si1", cs2a)
548 readers = ss.remote_get_buckets("si1")
549 self.failUnlessEqual(len(readers), 0)
550 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
551 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
552 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
554 leases = list(ss.get_leases("si1"))
555 self.failUnlessEqual(len(leases), 0)
558 # test overlapping uploads
559 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
560 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
561 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
562 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
563 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
564 sharenums, size, canary)
565 self.failUnlessEqual(len(already), 0)
566 self.failUnlessEqual(len(writers), 5)
567 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
568 sharenums, size, canary)
569 self.failUnlessEqual(len(already2), 0)
570 self.failUnlessEqual(len(writers2), 0)
571 for wb in writers.values():
574 leases = list(ss.get_leases("si3"))
575 self.failUnlessEqual(len(leases), 1)
577 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
578 sharenums, size, canary)
579 self.failUnlessEqual(len(already3), 5)
580 self.failUnlessEqual(len(writers3), 0)
582 leases = list(ss.get_leases("si3"))
583 self.failUnlessEqual(len(leases), 2)
585 def test_readonly(self):
586 workdir = self.workdir("test_readonly")
587 ss = StorageServer(workdir, readonly_storage=True)
588 ss.setNodeID("\x00" * 20)
589 ss.setServiceParent(self.sparent)
591 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
592 self.failUnlessEqual(already, set())
593 self.failUnlessEqual(writers, {})
595 stats = ss.get_stats()
596 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
598 if "storage_server.disk_avail" in stats:
599 # windows does not have os.statvfs, so it doesn't give us disk
600 # stats. But if there are stats, readonly_storage means
602 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
604 def test_discard(self):
605 # discard is really only used for other tests, but we test it anyways
606 workdir = self.workdir("test_discard")
607 ss = StorageServer(workdir, discard_storage=True)
608 ss.setNodeID("\x00" * 20)
609 ss.setServiceParent(self.sparent)
611 canary = FakeCanary()
612 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
613 self.failUnlessEqual(already, set())
614 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
615 for i,wb in writers.items():
616 wb.remote_write(0, "%25d" % i)
618 # since we discard the data, the shares should be present but sparse.
619 # Since we write with some seeks, the data we read back will be all
621 b = ss.remote_get_buckets("vid")
622 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
623 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
625 def test_advise_corruption(self):
626 workdir = self.workdir("test_advise_corruption")
627 ss = StorageServer(workdir, discard_storage=True)
628 ss.setNodeID("\x00" * 20)
629 ss.setServiceParent(self.sparent)
631 si0_s = base32.b2a("si0")
632 ss.remote_advise_corrupt_share("immutable", "si0", 0,
633 "This share smells funny.\n")
634 reportdir = os.path.join(workdir, "corruption-advisories")
635 reports = os.listdir(reportdir)
636 self.failUnlessEqual(len(reports), 1)
637 report_si0 = reports[0]
638 self.failUnless(si0_s in report_si0, report_si0)
639 f = open(os.path.join(reportdir, report_si0), "r")
642 self.failUnless("type: immutable" in report)
643 self.failUnless(("storage_index: %s" % si0_s) in report)
644 self.failUnless("share_number: 0" in report)
645 self.failUnless("This share smells funny." in report)
647 # test the RIBucketWriter version too
648 si1_s = base32.b2a("si1")
649 already,writers = self.allocate(ss, "si1", [1], 75)
650 self.failUnlessEqual(already, set())
651 self.failUnlessEqual(set(writers.keys()), set([1]))
652 writers[1].remote_write(0, "data")
653 writers[1].remote_close()
655 b = ss.remote_get_buckets("si1")
656 self.failUnlessEqual(set(b.keys()), set([1]))
657 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
659 reports = os.listdir(reportdir)
660 self.failUnlessEqual(len(reports), 2)
661 report_si1 = [r for r in reports if si1_s in r][0]
662 f = open(os.path.join(reportdir, report_si1), "r")
665 self.failUnless("type: immutable" in report)
666 self.failUnless(("storage_index: %s" % si1_s) in report)
667 self.failUnless("share_number: 1" in report)
668 self.failUnless("This share tastes like dust." in report)
672 class MutableServer(unittest.TestCase):
675 self.sparent = LoggingServiceParent()
676 self._lease_secret = itertools.count()
678 return self.sparent.stopService()
680 def workdir(self, name):
681 basedir = os.path.join("storage", "MutableServer", name)
684 def create(self, name):
685 workdir = self.workdir(name)
686 ss = StorageServer(workdir)
687 ss.setServiceParent(self.sparent)
688 ss.setNodeID("\x00" * 20)
691 def test_create(self):
692 ss = self.create("test_create")
694 def write_enabler(self, we_tag):
695 return hashutil.tagged_hash("we_blah", we_tag)
697 def renew_secret(self, tag):
698 return hashutil.tagged_hash("renew_blah", str(tag))
700 def cancel_secret(self, tag):
701 return hashutil.tagged_hash("cancel_blah", str(tag))
703 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
704 write_enabler = self.write_enabler(we_tag)
705 renew_secret = self.renew_secret(lease_tag)
706 cancel_secret = self.cancel_secret(lease_tag)
707 rstaraw = ss.remote_slot_testv_and_readv_and_writev
708 testandwritev = dict( [ (shnum, ([], [], None) )
709 for shnum in sharenums ] )
711 rc = rstaraw(storage_index,
712 (write_enabler, renew_secret, cancel_secret),
715 (did_write, readv_data) = rc
716 self.failUnless(did_write)
717 self.failUnless(isinstance(readv_data, dict))
718 self.failUnlessEqual(len(readv_data), 0)
720 def test_container_size(self):
721 ss = self.create("test_container_size")
722 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
724 rstaraw = ss.remote_slot_testv_and_readv_and_writev
725 secrets = ( self.write_enabler("we1"),
726 self.renew_secret("we1"),
727 self.cancel_secret("we1") )
728 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
729 answer = rstaraw("si1", secrets,
730 {0: ([], [(0,data)], len(data)+12)},
732 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
734 # trying to make the container too large will raise an exception
735 TOOBIG = MutableShareFile.MAX_SIZE + 10
736 self.failUnlessRaises(DataTooLargeError,
737 rstaraw, "si1", secrets,
738 {0: ([], [(0,data)], TOOBIG)},
741 # it should be possible to make the container smaller, although at
742 # the moment this doesn't actually affect the share
743 answer = rstaraw("si1", secrets,
744 {0: ([], [(0,data)], len(data)+8)},
746 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
748 def test_allocate(self):
749 ss = self.create("test_allocate")
750 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
753 read = ss.remote_slot_readv
754 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
756 self.failUnlessEqual(read("si1", [], [(0, 10)]),
757 {0: [""], 1: [""], 2: [""]})
758 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
762 secrets = ( self.write_enabler("we1"),
763 self.renew_secret("we1"),
764 self.cancel_secret("we1") )
765 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
766 write = ss.remote_slot_testv_and_readv_and_writev
767 answer = write("si1", secrets,
768 {0: ([], [(0,data)], None)},
770 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
772 self.failUnlessEqual(read("si1", [0], [(0,20)]),
773 {0: ["00000000001111111111"]})
774 self.failUnlessEqual(read("si1", [0], [(95,10)]),
776 #self.failUnlessEqual(s0.remote_get_length(), 100)
778 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
779 f = self.failUnlessRaises(BadWriteEnablerError,
780 write, "si1", bad_secrets,
782 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
784 # this testv should fail
785 answer = write("si1", secrets,
786 {0: ([(0, 12, "eq", "444444444444"),
787 (20, 5, "eq", "22222"),
794 self.failUnlessEqual(answer, (False,
795 {0: ["000000000011", "22222"],
799 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
802 answer = write("si1", secrets,
803 {0: ([(10, 5, "lt", "11111"),
810 self.failUnlessEqual(answer, (False,
815 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
818 def test_operators(self):
819 # test operators, the data we're comparing is '11111' in all cases.
820 # test both fail+pass, reset data after each one.
821 ss = self.create("test_operators")
823 secrets = ( self.write_enabler("we1"),
824 self.renew_secret("we1"),
825 self.cancel_secret("we1") )
826 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
827 write = ss.remote_slot_testv_and_readv_and_writev
828 read = ss.remote_slot_readv
831 write("si1", secrets,
832 {0: ([], [(0,data)], None)},
838 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
843 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
844 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
845 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
848 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
853 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
854 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
857 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
862 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
863 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
867 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
872 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
873 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
876 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
881 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
882 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
885 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
890 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
891 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
895 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
900 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
901 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
904 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
909 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
910 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
914 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
919 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
920 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
923 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
928 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
929 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
933 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
938 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
939 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
942 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
947 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
948 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
951 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
956 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
957 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
961 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
966 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
967 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
970 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
975 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
976 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
979 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
984 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
985 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
988 # finally, test some operators against empty shares
989 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
994 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
995 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
998 def test_readv(self):
999 ss = self.create("test_readv")
1000 secrets = ( self.write_enabler("we1"),
1001 self.renew_secret("we1"),
1002 self.cancel_secret("we1") )
1003 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1004 write = ss.remote_slot_testv_and_readv_and_writev
1005 read = ss.remote_slot_readv
1006 data = [("%d" % i) * 100 for i in range(3)]
1007 rc = write("si1", secrets,
1008 {0: ([], [(0,data[0])], None),
1009 1: ([], [(0,data[1])], None),
1010 2: ([], [(0,data[2])], None),
1012 self.failUnlessEqual(rc, (True, {}))
1014 answer = read("si1", [], [(0, 10)])
1015 self.failUnlessEqual(answer, {0: ["0"*10],
1019 def compare_leases_without_timestamps(self, leases_a, leases_b):
1020 self.failUnlessEqual(len(leases_a), len(leases_b))
1021 for i in range(len(leases_a)):
1022 num_a, a = leases_a[i]
1023 num_b, b = leases_b[i]
1024 self.failUnlessEqual(num_a, num_b)
1025 self.failUnlessEqual(a.owner_num, b.owner_num)
1026 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1027 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1028 self.failUnlessEqual(a.nodeid, b.nodeid)
1030 def compare_leases(self, leases_a, leases_b):
1031 self.failUnlessEqual(len(leases_a), len(leases_b))
1032 for i in range(len(leases_a)):
1033 num_a, a = leases_a[i]
1034 num_b, b = leases_b[i]
1035 self.failUnlessEqual(num_a, num_b)
1036 self.failUnlessEqual(a.owner_num, b.owner_num)
1037 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1038 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1039 self.failUnlessEqual(a.nodeid, b.nodeid)
1040 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1042 def test_leases(self):
1043 ss = self.create("test_leases")
1045 return ( self.write_enabler("we1"),
1046 self.renew_secret("we1-%d" % n),
1047 self.cancel_secret("we1-%d" % n) )
1048 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1049 write = ss.remote_slot_testv_and_readv_and_writev
1050 read = ss.remote_slot_readv
1051 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1052 self.failUnlessEqual(rc, (True, {}))
1054 # create a random non-numeric file in the bucket directory, to
1055 # exercise the code that's supposed to ignore those.
1056 bucket_dir = os.path.join(self.workdir("test_leases"),
1057 "shares", storage_index_to_dir("si1"))
1058 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1059 f.write("you ought to be ignoring me\n")
1062 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1063 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1065 # add-lease on a missing storage index is silently ignored
1066 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1068 # re-allocate the slots and use the same secrets, that should update
1070 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1071 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1074 ss.remote_renew_lease("si1", secrets(0)[1])
1075 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1077 # now allocate them with a bunch of different secrets, to trigger the
1078 # extended lease code. Use add_lease for one of them.
1079 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1080 self.failUnlessEqual(len(s0.debug_get_leases()), 2)
1081 secrets2 = secrets(2)
1082 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1083 self.failUnlessEqual(len(s0.debug_get_leases()), 3)
1084 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1085 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1086 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1088 self.failUnlessEqual(len(s0.debug_get_leases()), 6)
1090 # cancel one of them
1091 ss.remote_cancel_lease("si1", secrets(5)[2])
1092 self.failUnlessEqual(len(s0.debug_get_leases()), 5)
1094 all_leases = s0.debug_get_leases()
1095 # and write enough data to expand the container, forcing the server
1096 # to move the leases
1097 write("si1", secrets(0),
1098 {0: ([], [(0,data)], 200), },
1101 # read back the leases, make sure they're still intact.
1102 self.compare_leases_without_timestamps(all_leases,
1103 s0.debug_get_leases())
1105 ss.remote_renew_lease("si1", secrets(0)[1])
1106 ss.remote_renew_lease("si1", secrets(1)[1])
1107 ss.remote_renew_lease("si1", secrets(2)[1])
1108 ss.remote_renew_lease("si1", secrets(3)[1])
1109 ss.remote_renew_lease("si1", secrets(4)[1])
1110 self.compare_leases_without_timestamps(all_leases,
1111 s0.debug_get_leases())
1112 # get a new copy of the leases, with the current timestamps. Reading
1113 # data and failing to renew/cancel leases should leave the timestamps
1115 all_leases = s0.debug_get_leases()
1116 # renewing with a bogus token should prompt an error message
1118 # examine the exception thus raised, make sure the old nodeid is
1119 # present, to provide for share migration
1120 e = self.failUnlessRaises(IndexError,
1121 ss.remote_renew_lease, "si1",
1124 self.failUnless("Unable to renew non-existent lease" in e_s)
1125 self.failUnless("I have leases accepted by nodeids:" in e_s)
1126 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1128 # same for cancelling
1129 self.failUnlessRaises(IndexError,
1130 ss.remote_cancel_lease, "si1",
1132 self.compare_leases(all_leases, s0.debug_get_leases())
1134 # reading shares should not modify the timestamp
1135 read("si1", [], [(0,200)])
1136 self.compare_leases(all_leases, s0.debug_get_leases())
1138 write("si1", secrets(0),
1139 {0: ([], [(200, "make me bigger")], None)}, [])
1140 self.compare_leases_without_timestamps(all_leases,
1141 s0.debug_get_leases())
1143 write("si1", secrets(0),
1144 {0: ([], [(500, "make me really bigger")], None)}, [])
1145 self.compare_leases_without_timestamps(all_leases,
1146 s0.debug_get_leases())
1148 # now cancel them all
1149 ss.remote_cancel_lease("si1", secrets(0)[2])
1150 ss.remote_cancel_lease("si1", secrets(1)[2])
1151 ss.remote_cancel_lease("si1", secrets(2)[2])
1152 ss.remote_cancel_lease("si1", secrets(3)[2])
1154 # the slot should still be there
1155 remaining_shares = read("si1", [], [(0,10)])
1156 self.failUnlessEqual(len(remaining_shares), 1)
1157 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1159 # cancelling a non-existent lease should raise an IndexError
1160 self.failUnlessRaises(IndexError,
1161 ss.remote_cancel_lease, "si1", "nonsecret")
1163 # and the slot should still be there
1164 remaining_shares = read("si1", [], [(0,10)])
1165 self.failUnlessEqual(len(remaining_shares), 1)
1166 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1168 ss.remote_cancel_lease("si1", secrets(4)[2])
1169 # now the slot should be gone
1170 no_shares = read("si1", [], [(0,10)])
1171 self.failUnlessEqual(no_shares, {})
1173 # cancelling a lease on a non-existent share should raise an IndexError
1174 self.failUnlessRaises(IndexError,
1175 ss.remote_cancel_lease, "si2", "nonsecret")
1177 def test_remove(self):
1178 ss = self.create("test_remove")
1179 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1181 readv = ss.remote_slot_readv
1182 writev = ss.remote_slot_testv_and_readv_and_writev
1183 secrets = ( self.write_enabler("we1"),
1184 self.renew_secret("we1"),
1185 self.cancel_secret("we1") )
1186 # delete sh0 by setting its size to zero
1187 answer = writev("si1", secrets,
1190 # the answer should mention all the shares that existed before the
1192 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1193 # but a new read should show only sh1 and sh2
1194 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1197 # delete sh1 by setting its size to zero
1198 answer = writev("si1", secrets,
1201 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1202 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1205 # delete sh2 by setting its size to zero
1206 answer = writev("si1", secrets,
1209 self.failUnlessEqual(answer, (True, {2:[]}) )
1210 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1212 # and the bucket directory should now be gone
1213 si = base32.b2a("si1")
1214 # note: this is a detail of the storage server implementation, and
1215 # may change in the future
1217 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1218 bucketdir = os.path.join(prefixdir, si)
1219 self.failUnless(os.path.exists(prefixdir))
1220 self.failIf(os.path.exists(bucketdir))
1222 class Stats(unittest.TestCase):
1225 self.sparent = LoggingServiceParent()
1226 self._lease_secret = itertools.count()
1228 return self.sparent.stopService()
1230 def workdir(self, name):
1231 basedir = os.path.join("storage", "Server", name)
1234 def create(self, name):
1235 workdir = self.workdir(name)
1236 ss = StorageServer(workdir)
1237 ss.setNodeID("\x00" * 20)
1238 ss.setServiceParent(self.sparent)
1241 def test_latencies(self):
1242 ss = self.create("test_latencies")
1243 for i in range(10000):
1244 ss.add_latency("allocate", 1.0 * i)
1245 for i in range(1000):
1246 ss.add_latency("renew", 1.0 * i)
1248 ss.add_latency("cancel", 2.0 * i)
1249 ss.add_latency("get", 5.0)
1251 output = ss.get_latencies()
1253 self.failUnlessEqual(sorted(output.keys()),
1254 sorted(["allocate", "renew", "cancel", "get"]))
1255 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1256 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1257 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1258 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1259 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1260 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1261 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1262 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1263 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1265 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1266 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1267 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1268 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1269 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1270 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1271 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1272 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1273 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1275 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1276 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1277 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1278 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1279 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1280 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1281 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1282 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1283 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1285 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1286 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1287 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1288 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1289 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1290 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1291 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1292 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1293 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)