]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: introduce v2 immutable shares, with 8-byte offsets fields, to remove two...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10      StorageServer, MutableShareFile, \
11      storage_index_to_dir, DataTooLargeError, LeaseInfo
12 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
13      ReadBucketProxy
14 from allmydata.interfaces import BadWriteEnablerError
15 from allmydata.test.common import LoggingServiceParent
16
17 class Marker:
18     pass
19 class FakeCanary:
20     def __init__(self, ignore_disconnectors=False):
21         self.ignore = ignore_disconnectors
22         self.disconnectors = {}
23     def notifyOnDisconnect(self, f, *args, **kwargs):
24         if self.ignore:
25             return
26         m = Marker()
27         self.disconnectors[m] = (f, args, kwargs)
28         return m
29     def dontNotifyOnDisconnect(self, marker):
30         if self.ignore:
31             return
32         del self.disconnectors[marker]
33
34 class FakeStatsProvider:
35     def count(self, name, delta=1):
36         pass
37     def register_producer(self, producer):
38         pass
39
40 class Bucket(unittest.TestCase):
41     def make_workdir(self, name):
42         basedir = os.path.join("storage", "Bucket", name)
43         incoming = os.path.join(basedir, "tmp", "bucket")
44         final = os.path.join(basedir, "bucket")
45         fileutil.make_dirs(basedir)
46         fileutil.make_dirs(os.path.join(basedir, "tmp"))
47         return incoming, final
48
49     def bucket_writer_closed(self, bw, consumed):
50         pass
51     def add_latency(self, category, latency):
52         pass
53     def count(self, name, delta=1):
54         pass
55
56     def make_lease(self):
57         owner_num = 0
58         renew_secret = os.urandom(32)
59         cancel_secret = os.urandom(32)
60         expiration_time = time.time() + 5000
61         return LeaseInfo(owner_num, renew_secret, cancel_secret,
62                          expiration_time, "\x00" * 20)
63
64     def test_create(self):
65         incoming, final = self.make_workdir("test_create")
66         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
67                           FakeCanary())
68         bw.remote_write(0, "a"*25)
69         bw.remote_write(25, "b"*25)
70         bw.remote_write(50, "c"*25)
71         bw.remote_write(75, "d"*7)
72         bw.remote_close()
73
74     def test_readwrite(self):
75         incoming, final = self.make_workdir("test_readwrite")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*7) # last block may be short
81         bw.remote_close()
82
83         # now read from it
84         br = BucketReader(self, bw.finalhome)
85         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
86         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
87         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
88
89 class RemoteBucket:
90
91     def callRemote(self, methname, *args, **kwargs):
92         def _call():
93             meth = getattr(self.target, "remote_" + methname)
94             return meth(*args, **kwargs)
95         return defer.maybeDeferred(_call)
96
97 class BucketProxy(unittest.TestCase):
98     def make_bucket(self, name, size):
99         basedir = os.path.join("storage", "BucketProxy", name)
100         incoming = os.path.join(basedir, "tmp", "bucket")
101         final = os.path.join(basedir, "bucket")
102         fileutil.make_dirs(basedir)
103         fileutil.make_dirs(os.path.join(basedir, "tmp"))
104         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
105                           FakeCanary())
106         rb = RemoteBucket()
107         rb.target = bw
108         return bw, rb, final
109
110     def make_lease(self):
111         owner_num = 0
112         renew_secret = os.urandom(32)
113         cancel_secret = os.urandom(32)
114         expiration_time = time.time() + 5000
115         return LeaseInfo(owner_num, renew_secret, cancel_secret,
116                          expiration_time, "\x00" * 20)
117
118     def bucket_writer_closed(self, bw, consumed):
119         pass
120     def add_latency(self, category, latency):
121         pass
122     def count(self, name, delta=1):
123         pass
124
125     def test_create(self):
126         bw, rb, sharefname = self.make_bucket("test_create", 500)
127         bp = WriteBucketProxy(rb,
128                               data_size=300,
129                               segment_size=10,
130                               num_segments=5,
131                               num_share_hashes=3,
132                               uri_extension_size=500, nodeid=None)
133         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
134
135     def _do_test_readwrite(self, header_size, wbp_class, rbp_class):
136         # Let's pretend each share has 100 bytes of data, and that there are
137         # 4 segments (25 bytes each), and 8 shares total. So the three
138         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
139         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
140         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
141         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
142         # long. That should make the whole share:
143         #
144         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
145         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
146
147         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
148
149         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
150                             for i in range(7)]
151         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
152                             for i in range(7)]
153         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
154                         for i in range(7)]
155         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
156                         for i in (1,9,13)]
157         uri_extension = "s" + "E"*498 + "e"
158
159         bw, rb, sharefname = self.make_bucket("test_readwrite", sharesize)
160         bp = wbp_class(rb,
161                        data_size=95,
162                        segment_size=25,
163                        num_segments=4,
164                        num_share_hashes=3,
165                        uri_extension_size=len(uri_extension),
166                        nodeid=None)
167
168         d = bp.start()
169         d.addCallback(lambda res: bp.put_block(0, "a"*25))
170         d.addCallback(lambda res: bp.put_block(1, "b"*25))
171         d.addCallback(lambda res: bp.put_block(2, "c"*25))
172         d.addCallback(lambda res: bp.put_block(3, "d"*20))
173         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
174         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
175         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
176         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
177         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
178         d.addCallback(lambda res: bp.close())
179
180         # now read everything back
181         def _start_reading(res):
182             br = BucketReader(self, sharefname)
183             rb = RemoteBucket()
184             rb.target = br
185             rbp = rbp_class(rb, peerid="abc")
186             self.failUnless("to peer" in repr(rbp))
187             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
188
189             d1 = rbp.startIfNecessary()
190             d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
191             d1.addCallback(lambda res: rbp.get_block(0))
192             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
193             d1.addCallback(lambda res: rbp.get_block(1))
194             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
195             d1.addCallback(lambda res: rbp.get_block(2))
196             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
197             d1.addCallback(lambda res: rbp.get_block(3))
198             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
199
200             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
201             d1.addCallback(lambda res:
202                            self.failUnlessEqual(res, plaintext_hashes))
203             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
204             d1.addCallback(lambda res:
205                            self.failUnlessEqual(res, crypttext_hashes))
206             d1.addCallback(lambda res: rbp.get_block_hashes())
207             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
208             d1.addCallback(lambda res: rbp.get_share_hashes())
209             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
210             d1.addCallback(lambda res: rbp.get_uri_extension())
211             d1.addCallback(lambda res:
212                            self.failUnlessEqual(res, uri_extension))
213
214             return d1
215
216         d.addCallback(_start_reading)
217
218         return d
219
220     def test_readwrite_v1(self):
221         return self._do_test_readwrite(0x24, WriteBucketProxy, ReadBucketProxy)
222
223     def test_readwrite_v2(self):
224         return self._do_test_readwrite(0x44, WriteBucketProxy_v2, ReadBucketProxy)
225
226 class Server(unittest.TestCase):
227
228     def setUp(self):
229         self.sparent = LoggingServiceParent()
230         self._lease_secret = itertools.count()
231     def tearDown(self):
232         return self.sparent.stopService()
233
234     def workdir(self, name):
235         basedir = os.path.join("storage", "Server", name)
236         return basedir
237
238     def create(self, name, sizelimit=None):
239         workdir = self.workdir(name)
240         ss = StorageServer(workdir, sizelimit,
241                            stats_provider=FakeStatsProvider())
242         ss.setNodeID("\x00" * 20)
243         ss.setServiceParent(self.sparent)
244         return ss
245
246     def test_create(self):
247         ss = self.create("test_create")
248
249     def allocate(self, ss, storage_index, sharenums, size, canary=None):
250         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
251         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
252         if not canary:
253             canary = FakeCanary()
254         return ss.remote_allocate_buckets(storage_index,
255                                           renew_secret, cancel_secret,
256                                           sharenums, size, canary)
257
258     def test_dont_overfill_dirs(self):
259         """
260         This test asserts that if you add a second share whose storage index
261         share lots of leading bits with an extant share (but isn't the exact
262         same storage index), this won't add an entry to the share directory.
263         """
264         ss = self.create("test_dont_overfill_dirs")
265         already, writers = self.allocate(ss, "storageindex", [0], 10)
266         for i, wb in writers.items():
267             wb.remote_write(0, "%10d" % i)
268             wb.remote_close()
269         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
270                                 "shares")
271         children_of_storedir = set(os.listdir(storedir))
272
273         # Now store another one under another storageindex that has leading
274         # chars the same as the first storageindex.
275         already, writers = self.allocate(ss, "storageindey", [0], 10)
276         for i, wb in writers.items():
277             wb.remote_write(0, "%10d" % i)
278             wb.remote_close()
279         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
280                                 "shares")
281         new_children_of_storedir = set(os.listdir(storedir))
282         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
283
284     def test_remove_incoming(self):
285         ss = self.create("test_remove_incoming")
286         already, writers = self.allocate(ss, "vid", range(3), 10)
287         for i,wb in writers.items():
288             wb.remote_write(0, "%10d" % i)
289             wb.remote_close()
290         incoming_share_dir = wb.incominghome
291         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
292         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
293         incoming_dir = os.path.dirname(incoming_prefix_dir)
294         self.failIf(os.path.exists(incoming_bucket_dir))
295         self.failIf(os.path.exists(incoming_prefix_dir))
296         self.failUnless(os.path.exists(incoming_dir))
297
298     def test_allocate(self):
299         ss = self.create("test_allocate")
300
301         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
302
303         canary = FakeCanary()
304         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
305         self.failUnlessEqual(already, set())
306         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
307
308         # while the buckets are open, they should not count as readable
309         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
310
311         # close the buckets
312         for i,wb in writers.items():
313             wb.remote_write(0, "%25d" % i)
314             wb.remote_close()
315             # aborting a bucket that was already closed is a no-op
316             wb.remote_abort()
317
318         # now they should be readable
319         b = ss.remote_get_buckets("allocate")
320         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
321         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
322
323         # now if we ask about writing again, the server should offer those
324         # three buckets as already present. It should offer them even if we
325         # don't ask about those specific ones.
326         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
327         self.failUnlessEqual(already, set([0,1,2]))
328         self.failUnlessEqual(set(writers.keys()), set([3,4]))
329
330         # while those two buckets are open for writing, the server should
331         # refuse to offer them to uploaders
332
333         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
334         self.failUnlessEqual(already2, set([0,1,2]))
335         self.failUnlessEqual(set(writers2.keys()), set([5]))
336
337         # aborting the writes should remove the tempfiles
338         for i,wb in writers2.items():
339             wb.remote_abort()
340         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
341         self.failUnlessEqual(already2, set([0,1,2]))
342         self.failUnlessEqual(set(writers2.keys()), set([5]))
343
344         for i,wb in writers2.items():
345             wb.remote_abort()
346         for i,wb in writers.items():
347             wb.remote_abort()
348
349     def test_disconnect(self):
350         # simulate a disconnection
351         ss = self.create("test_disconnect")
352         canary = FakeCanary()
353         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
354         self.failUnlessEqual(already, set())
355         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
356         for (f,args,kwargs) in canary.disconnectors.values():
357             f(*args, **kwargs)
358         del already
359         del writers
360
361         # that ought to delete the incoming shares
362         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
363         self.failUnlessEqual(already, set())
364         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
365
366     def test_sizelimits(self):
367         ss = self.create("test_sizelimits", 5000)
368         # a newly created and filled share incurs this much overhead, beyond
369         # the size we request.
370         OVERHEAD = 3*4
371         LEASE_SIZE = 4+32+32+4
372         canary = FakeCanary(True)
373         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
374         self.failUnlessEqual(len(writers), 3)
375         # now the StorageServer should have 3000 bytes provisionally
376         # allocated, allowing only 2000 more to be claimed
377         self.failUnlessEqual(len(ss._active_writers), 3)
378
379         # allocating 1001-byte shares only leaves room for one
380         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
381         self.failUnlessEqual(len(writers2), 1)
382         self.failUnlessEqual(len(ss._active_writers), 4)
383
384         # we abandon the first set, so their provisional allocation should be
385         # returned
386         del already
387         del writers
388         self.failUnlessEqual(len(ss._active_writers), 1)
389         # now we have a provisional allocation of 1001 bytes
390
391         # and we close the second set, so their provisional allocation should
392         # become real, long-term allocation, and grows to include the
393         # overhead.
394         for bw in writers2.values():
395             bw.remote_write(0, "a"*25)
396             bw.remote_close()
397         del already2
398         del writers2
399         del bw
400         self.failUnlessEqual(len(ss._active_writers), 0)
401
402         allocated = 1001 + OVERHEAD + LEASE_SIZE
403         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
404         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
405         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
406         self.failUnlessEqual(len(writers3), 39)
407         self.failUnlessEqual(len(ss._active_writers), 39)
408
409         del already3
410         del writers3
411         self.failUnlessEqual(len(ss._active_writers), 0)
412         ss.disownServiceParent()
413         del ss
414
415         # creating a new StorageServer in the same directory should see the
416         # same usage.
417
418         # metadata that goes into the share file is counted upon share close,
419         # as well as at startup. metadata that goes into other files will not
420         # be counted until the next startup, so if we were creating any
421         # extra-file metadata, the allocation would be more than 'allocated'
422         # and this test would need to be changed.
423         ss = self.create("test_sizelimits", 5000)
424         already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
425         self.failUnlessEqual(len(writers4), 39)
426         self.failUnlessEqual(len(ss._active_writers), 39)
427
428     def test_seek(self):
429         basedir = self.workdir("test_seek_behavior")
430         fileutil.make_dirs(basedir)
431         filename = os.path.join(basedir, "testfile")
432         f = open(filename, "wb")
433         f.write("start")
434         f.close()
435         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
436         # files. mode="a" preserves previous contents but does not allow
437         # seeking-to-create-holes. mode="r+" allows both.
438         f = open(filename, "rb+")
439         f.seek(100)
440         f.write("100")
441         f.close()
442         filelen = os.stat(filename)[stat.ST_SIZE]
443         self.failUnlessEqual(filelen, 100+3)
444         f2 = open(filename, "rb")
445         self.failUnlessEqual(f2.read(5), "start")
446
447
448     def test_leases(self):
449         ss = self.create("test_leases")
450         canary = FakeCanary()
451         sharenums = range(5)
452         size = 100
453
454         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
455                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
456         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
457                                                      sharenums, size, canary)
458         self.failUnlessEqual(len(already), 0)
459         self.failUnlessEqual(len(writers), 5)
460         for wb in writers.values():
461             wb.remote_close()
462
463         leases = list(ss.get_leases("si0"))
464         self.failUnlessEqual(len(leases), 1)
465         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
466
467         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
468                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
469         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
470                                                      sharenums, size, canary)
471         for wb in writers.values():
472             wb.remote_close()
473
474         # take out a second lease on si1
475         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
476                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
477         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
478                                                      sharenums, size, canary)
479         self.failUnlessEqual(len(already), 5)
480         self.failUnlessEqual(len(writers), 0)
481
482         leases = list(ss.get_leases("si1"))
483         self.failUnlessEqual(len(leases), 2)
484         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
485
486         # check that si0 is readable
487         readers = ss.remote_get_buckets("si0")
488         self.failUnlessEqual(len(readers), 5)
489
490         # renew the first lease. Only the proper renew_secret should work
491         ss.remote_renew_lease("si0", rs0)
492         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
493         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
494
495         # check that si0 is still readable
496         readers = ss.remote_get_buckets("si0")
497         self.failUnlessEqual(len(readers), 5)
498
499         # now cancel it
500         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
501         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
502         ss.remote_cancel_lease("si0", cs0)
503
504         # si0 should now be gone
505         readers = ss.remote_get_buckets("si0")
506         self.failUnlessEqual(len(readers), 0)
507         # and the renew should no longer work
508         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
509
510
511         # cancel the first lease on si1, leaving the second in place
512         ss.remote_cancel_lease("si1", cs1)
513         readers = ss.remote_get_buckets("si1")
514         self.failUnlessEqual(len(readers), 5)
515         # the corresponding renew should no longer work
516         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
517
518         leases = list(ss.get_leases("si1"))
519         self.failUnlessEqual(len(leases), 1)
520         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
521
522         ss.remote_renew_lease("si1", rs2)
523         # cancelling the second should make it go away
524         ss.remote_cancel_lease("si1", cs2)
525         readers = ss.remote_get_buckets("si1")
526         self.failUnlessEqual(len(readers), 0)
527         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
528         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
529
530         leases = list(ss.get_leases("si1"))
531         self.failUnlessEqual(len(leases), 0)
532
533
534         # test overlapping uploads
535         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
538                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
539         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
540                                                      sharenums, size, canary)
541         self.failUnlessEqual(len(already), 0)
542         self.failUnlessEqual(len(writers), 5)
543         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
544                                                        sharenums, size, canary)
545         self.failUnlessEqual(len(already2), 0)
546         self.failUnlessEqual(len(writers2), 0)
547         for wb in writers.values():
548             wb.remote_close()
549
550         leases = list(ss.get_leases("si3"))
551         self.failUnlessEqual(len(leases), 1)
552
553         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
554                                                        sharenums, size, canary)
555         self.failUnlessEqual(len(already3), 5)
556         self.failUnlessEqual(len(writers3), 0)
557
558         leases = list(ss.get_leases("si3"))
559         self.failUnlessEqual(len(leases), 2)
560
561     def test_readonly(self):
562         workdir = self.workdir("test_readonly")
563         ss = StorageServer(workdir, readonly_storage=True)
564         ss.setNodeID("\x00" * 20)
565         ss.setServiceParent(self.sparent)
566
567         canary = FakeCanary()
568         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
569         self.failUnlessEqual(already, set())
570         self.failUnlessEqual(writers, {})
571
572     def test_discard(self):
573         # discard is really only used for other tests, but we test it anyways
574         workdir = self.workdir("test_discard")
575         ss = StorageServer(workdir, discard_storage=True)
576         ss.setNodeID("\x00" * 20)
577         ss.setServiceParent(self.sparent)
578
579         canary = FakeCanary()
580         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
581         self.failUnlessEqual(already, set())
582         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
583         for i,wb in writers.items():
584             wb.remote_write(0, "%25d" % i)
585             wb.remote_close()
586         # since we discard the data, the shares should be present but sparse.
587         # Since we write with some seeks, the data we read back will be all
588         # zeros.
589         b = ss.remote_get_buckets("vid")
590         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
591         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
592
593
594
595 class MutableServer(unittest.TestCase):
596
597     def setUp(self):
598         self.sparent = LoggingServiceParent()
599         self._lease_secret = itertools.count()
600     def tearDown(self):
601         return self.sparent.stopService()
602
603     def workdir(self, name):
604         basedir = os.path.join("storage", "MutableServer", name)
605         return basedir
606
607     def create(self, name, sizelimit=None):
608         workdir = self.workdir(name)
609         ss = StorageServer(workdir, sizelimit)
610         ss.setServiceParent(self.sparent)
611         ss.setNodeID("\x00" * 20)
612         return ss
613
614     def test_create(self):
615         ss = self.create("test_create")
616
617     def write_enabler(self, we_tag):
618         return hashutil.tagged_hash("we_blah", we_tag)
619
620     def renew_secret(self, tag):
621         return hashutil.tagged_hash("renew_blah", str(tag))
622
623     def cancel_secret(self, tag):
624         return hashutil.tagged_hash("cancel_blah", str(tag))
625
626     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
627         write_enabler = self.write_enabler(we_tag)
628         renew_secret = self.renew_secret(lease_tag)
629         cancel_secret = self.cancel_secret(lease_tag)
630         rstaraw = ss.remote_slot_testv_and_readv_and_writev
631         testandwritev = dict( [ (shnum, ([], [], None) )
632                          for shnum in sharenums ] )
633         readv = []
634         rc = rstaraw(storage_index,
635                      (write_enabler, renew_secret, cancel_secret),
636                      testandwritev,
637                      readv)
638         (did_write, readv_data) = rc
639         self.failUnless(did_write)
640         self.failUnless(isinstance(readv_data, dict))
641         self.failUnlessEqual(len(readv_data), 0)
642
643     def test_container_size(self):
644         ss = self.create("test_container_size")
645         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
646                       set([0,1,2]), 100)
647         rstaraw = ss.remote_slot_testv_and_readv_and_writev
648         secrets = ( self.write_enabler("we1"),
649                     self.renew_secret("we1"),
650                     self.cancel_secret("we1") )
651         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
652         answer = rstaraw("si1", secrets,
653                          {0: ([], [(0,data)], len(data)+12)},
654                          [])
655         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
656
657         # trying to make the container too large will raise an exception
658         TOOBIG = MutableShareFile.MAX_SIZE + 10
659         self.failUnlessRaises(DataTooLargeError,
660                               rstaraw, "si1", secrets,
661                               {0: ([], [(0,data)], TOOBIG)},
662                               [])
663
664         # it should be possible to make the container smaller, although at
665         # the moment this doesn't actually affect the share
666         answer = rstaraw("si1", secrets,
667                          {0: ([], [(0,data)], len(data)+8)},
668                          [])
669         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
670
671     def test_allocate(self):
672         ss = self.create("test_allocate")
673         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
674                       set([0,1,2]), 100)
675
676         read = ss.remote_slot_readv
677         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
678                              {0: [""]})
679         self.failUnlessEqual(read("si1", [], [(0, 10)]),
680                              {0: [""], 1: [""], 2: [""]})
681         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
682                              {0: [""]})
683
684         # try writing to one
685         secrets = ( self.write_enabler("we1"),
686                     self.renew_secret("we1"),
687                     self.cancel_secret("we1") )
688         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
689         write = ss.remote_slot_testv_and_readv_and_writev
690         answer = write("si1", secrets,
691                        {0: ([], [(0,data)], None)},
692                        [])
693         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
694
695         self.failUnlessEqual(read("si1", [0], [(0,20)]),
696                              {0: ["00000000001111111111"]})
697         self.failUnlessEqual(read("si1", [0], [(95,10)]),
698                              {0: ["99999"]})
699         #self.failUnlessEqual(s0.remote_get_length(), 100)
700
701         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
702         f = self.failUnlessRaises(BadWriteEnablerError,
703                                   write, "si1", bad_secrets,
704                                   {}, [])
705         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
706
707         # this testv should fail
708         answer = write("si1", secrets,
709                        {0: ([(0, 12, "eq", "444444444444"),
710                              (20, 5, "eq", "22222"),
711                              ],
712                             [(0, "x"*100)],
713                             None),
714                         },
715                        [(0,12), (20,5)],
716                        )
717         self.failUnlessEqual(answer, (False,
718                                       {0: ["000000000011", "22222"],
719                                        1: ["", ""],
720                                        2: ["", ""],
721                                        }))
722         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
723
724         # as should this one
725         answer = write("si1", secrets,
726                        {0: ([(10, 5, "lt", "11111"),
727                              ],
728                             [(0, "x"*100)],
729                             None),
730                         },
731                        [(10,5)],
732                        )
733         self.failUnlessEqual(answer, (False,
734                                       {0: ["11111"],
735                                        1: [""],
736                                        2: [""]},
737                                       ))
738         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
739
740
741     def test_operators(self):
742         # test operators, the data we're comparing is '11111' in all cases.
743         # test both fail+pass, reset data after each one.
744         ss = self.create("test_operators")
745
746         secrets = ( self.write_enabler("we1"),
747                     self.renew_secret("we1"),
748                     self.cancel_secret("we1") )
749         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
750         write = ss.remote_slot_testv_and_readv_and_writev
751         read = ss.remote_slot_readv
752
753         def reset():
754             write("si1", secrets,
755                   {0: ([], [(0,data)], None)},
756                   [])
757
758         reset()
759
760         #  lt
761         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
762                                              ],
763                                             [(0, "x"*100)],
764                                             None,
765                                             )}, [(10,5)])
766         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
767         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
768         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
769         reset()
770
771         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
772                                              ],
773                                             [(0, "x"*100)],
774                                             None,
775                                             )}, [(10,5)])
776         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
777         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
778         reset()
779
780         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
781                                              ],
782                                             [(0, "y"*100)],
783                                             None,
784                                             )}, [(10,5)])
785         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
786         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
787         reset()
788
789         #  le
790         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
791                                              ],
792                                             [(0, "x"*100)],
793                                             None,
794                                             )}, [(10,5)])
795         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
796         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
797         reset()
798
799         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
800                                              ],
801                                             [(0, "y"*100)],
802                                             None,
803                                             )}, [(10,5)])
804         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
805         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
806         reset()
807
808         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
809                                              ],
810                                             [(0, "y"*100)],
811                                             None,
812                                             )}, [(10,5)])
813         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
814         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
815         reset()
816
817         #  eq
818         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
819                                              ],
820                                             [(0, "x"*100)],
821                                             None,
822                                             )}, [(10,5)])
823         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
825         reset()
826
827         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
828                                              ],
829                                             [(0, "y"*100)],
830                                             None,
831                                             )}, [(10,5)])
832         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
833         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
834         reset()
835
836         #  ne
837         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
838                                              ],
839                                             [(0, "x"*100)],
840                                             None,
841                                             )}, [(10,5)])
842         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
843         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
844         reset()
845
846         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
847                                              ],
848                                             [(0, "y"*100)],
849                                             None,
850                                             )}, [(10,5)])
851         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
852         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
853         reset()
854
855         #  ge
856         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
857                                              ],
858                                             [(0, "y"*100)],
859                                             None,
860                                             )}, [(10,5)])
861         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
862         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
863         reset()
864
865         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
866                                              ],
867                                             [(0, "y"*100)],
868                                             None,
869                                             )}, [(10,5)])
870         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
871         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
872         reset()
873
874         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
875                                              ],
876                                             [(0, "y"*100)],
877                                             None,
878                                             )}, [(10,5)])
879         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
880         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
881         reset()
882
883         #  gt
884         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
885                                              ],
886                                             [(0, "y"*100)],
887                                             None,
888                                             )}, [(10,5)])
889         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
890         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
891         reset()
892
893         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
894                                              ],
895                                             [(0, "x"*100)],
896                                             None,
897                                             )}, [(10,5)])
898         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
899         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
900         reset()
901
902         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
903                                              ],
904                                             [(0, "x"*100)],
905                                             None,
906                                             )}, [(10,5)])
907         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
908         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
909         reset()
910
911         # finally, test some operators against empty shares
912         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
913                                              ],
914                                             [(0, "x"*100)],
915                                             None,
916                                             )}, [(10,5)])
917         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
918         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
919         reset()
920
921     def test_readv(self):
922         ss = self.create("test_readv")
923         secrets = ( self.write_enabler("we1"),
924                     self.renew_secret("we1"),
925                     self.cancel_secret("we1") )
926         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
927         write = ss.remote_slot_testv_and_readv_and_writev
928         read = ss.remote_slot_readv
929         data = [("%d" % i) * 100 for i in range(3)]
930         rc = write("si1", secrets,
931                    {0: ([], [(0,data[0])], None),
932                     1: ([], [(0,data[1])], None),
933                     2: ([], [(0,data[2])], None),
934                     }, [])
935         self.failUnlessEqual(rc, (True, {}))
936
937         answer = read("si1", [], [(0, 10)])
938         self.failUnlessEqual(answer, {0: ["0"*10],
939                                       1: ["1"*10],
940                                       2: ["2"*10]})
941
942     def compare_leases_without_timestamps(self, leases_a, leases_b):
943         self.failUnlessEqual(len(leases_a), len(leases_b))
944         for i in range(len(leases_a)):
945             num_a, a = leases_a[i]
946             num_b, b = leases_b[i]
947             self.failUnlessEqual(num_a, num_b)
948             self.failUnlessEqual(a.owner_num,       b.owner_num)
949             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
950             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
951             self.failUnlessEqual(a.nodeid,          b.nodeid)
952
953     def compare_leases(self, leases_a, leases_b):
954         self.failUnlessEqual(len(leases_a), len(leases_b))
955         for i in range(len(leases_a)):
956             num_a, a = leases_a[i]
957             num_b, b = leases_b[i]
958             self.failUnlessEqual(num_a, num_b)
959             self.failUnlessEqual(a.owner_num,       b.owner_num)
960             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
961             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
962             self.failUnlessEqual(a.nodeid,          b.nodeid)
963             self.failUnlessEqual(a.expiration_time, b.expiration_time)
964
965     def test_leases(self):
966         ss = self.create("test_leases", sizelimit=1000*1000)
967         def secrets(n):
968             return ( self.write_enabler("we1"),
969                      self.renew_secret("we1-%d" % n),
970                      self.cancel_secret("we1-%d" % n) )
971         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
972         write = ss.remote_slot_testv_and_readv_and_writev
973         read = ss.remote_slot_readv
974         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
975         self.failUnlessEqual(rc, (True, {}))
976
977         # create a random non-numeric file in the bucket directory, to
978         # exercise the code that's supposed to ignore those.
979         bucket_dir = os.path.join(self.workdir("test_leases"),
980                                   "shares", storage_index_to_dir("si1"))
981         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
982         f.write("you ought to be ignoring me\n")
983         f.close()
984
985         # re-allocate the slots and use the same secrets, that should update
986         # the lease
987         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
988
989         # renew it directly
990         ss.remote_renew_lease("si1", secrets(0)[1])
991
992         # now allocate them with a bunch of different secrets, to trigger the
993         # extended lease code
994         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
995         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
996         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
997         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
998         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
999
1000         # cancel one of them
1001         ss.remote_cancel_lease("si1", secrets(5)[2])
1002
1003         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1004         all_leases = s0.debug_get_leases()
1005         self.failUnlessEqual(len(all_leases), 5)
1006
1007         # and write enough data to expand the container, forcing the server
1008         # to move the leases
1009         write("si1", secrets(0),
1010               {0: ([], [(0,data)], 200), },
1011               [])
1012
1013         # read back the leases, make sure they're still intact.
1014         self.compare_leases_without_timestamps(all_leases,
1015                                                s0.debug_get_leases())
1016
1017         ss.remote_renew_lease("si1", secrets(0)[1])
1018         ss.remote_renew_lease("si1", secrets(1)[1])
1019         ss.remote_renew_lease("si1", secrets(2)[1])
1020         ss.remote_renew_lease("si1", secrets(3)[1])
1021         ss.remote_renew_lease("si1", secrets(4)[1])
1022         self.compare_leases_without_timestamps(all_leases,
1023                                                s0.debug_get_leases())
1024         # get a new copy of the leases, with the current timestamps. Reading
1025         # data and failing to renew/cancel leases should leave the timestamps
1026         # alone.
1027         all_leases = s0.debug_get_leases()
1028         # renewing with a bogus token should prompt an error message
1029
1030         # examine the exception thus raised, make sure the old nodeid is
1031         # present, to provide for share migration
1032         e = self.failUnlessRaises(IndexError,
1033                                   ss.remote_renew_lease, "si1",
1034                                   secrets(20)[1])
1035         e_s = str(e)
1036         self.failUnless("Unable to renew non-existent lease" in e_s)
1037         self.failUnless("I have leases accepted by nodeids:" in e_s)
1038         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1039
1040         # same for cancelling
1041         self.failUnlessRaises(IndexError,
1042                               ss.remote_cancel_lease, "si1",
1043                               secrets(20)[2])
1044         self.compare_leases(all_leases, s0.debug_get_leases())
1045
1046         # reading shares should not modify the timestamp
1047         read("si1", [], [(0,200)])
1048         self.compare_leases(all_leases, s0.debug_get_leases())
1049
1050         write("si1", secrets(0),
1051               {0: ([], [(200, "make me bigger")], None)}, [])
1052         self.compare_leases_without_timestamps(all_leases,
1053                                                s0.debug_get_leases())
1054
1055         write("si1", secrets(0),
1056               {0: ([], [(500, "make me really bigger")], None)}, [])
1057         self.compare_leases_without_timestamps(all_leases,
1058                                                s0.debug_get_leases())
1059
1060         # now cancel them all
1061         ss.remote_cancel_lease("si1", secrets(0)[2])
1062         ss.remote_cancel_lease("si1", secrets(1)[2])
1063         ss.remote_cancel_lease("si1", secrets(2)[2])
1064         ss.remote_cancel_lease("si1", secrets(3)[2])
1065
1066         # the slot should still be there
1067         remaining_shares = read("si1", [], [(0,10)])
1068         self.failUnlessEqual(len(remaining_shares), 1)
1069         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1070
1071         # cancelling a non-existent lease should raise an IndexError
1072         self.failUnlessRaises(IndexError,
1073                               ss.remote_cancel_lease, "si1", "nonsecret")
1074
1075         # and the slot should still be there
1076         remaining_shares = read("si1", [], [(0,10)])
1077         self.failUnlessEqual(len(remaining_shares), 1)
1078         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1079
1080         ss.remote_cancel_lease("si1", secrets(4)[2])
1081         # now the slot should be gone
1082         no_shares = read("si1", [], [(0,10)])
1083         self.failUnlessEqual(no_shares, {})
1084
1085         # cancelling a lease on a non-existent share should raise an IndexError
1086         self.failUnlessRaises(IndexError,
1087                               ss.remote_cancel_lease, "si2", "nonsecret")
1088
1089
1090 class Stats(unittest.TestCase):
1091
1092     def setUp(self):
1093         self.sparent = LoggingServiceParent()
1094         self._lease_secret = itertools.count()
1095     def tearDown(self):
1096         return self.sparent.stopService()
1097
1098     def workdir(self, name):
1099         basedir = os.path.join("storage", "Server", name)
1100         return basedir
1101
1102     def create(self, name, sizelimit=None):
1103         workdir = self.workdir(name)
1104         ss = StorageServer(workdir, sizelimit)
1105         ss.setNodeID("\x00" * 20)
1106         ss.setServiceParent(self.sparent)
1107         return ss
1108
1109     def test_latencies(self):
1110         ss = self.create("test_latencies")
1111         for i in range(10000):
1112             ss.add_latency("allocate", 1.0 * i)
1113         for i in range(1000):
1114             ss.add_latency("renew", 1.0 * i)
1115         for i in range(10):
1116             ss.add_latency("cancel", 2.0 * i)
1117         ss.add_latency("get", 5.0)
1118
1119         output = ss.get_latencies()
1120
1121         self.failUnlessEqual(sorted(output.keys()),
1122                              sorted(["allocate", "renew", "cancel", "get"]))
1123         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1124         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1125         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1126         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1127         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1128         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1129         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1130         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1131         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1132
1133         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1134         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1135         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1136         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1137         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1138         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1139         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1140         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1141         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1142
1143         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1144         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1145         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1146         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1147         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1148         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1149         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1150         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1151         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1152
1153         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1154         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1155         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1156         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1157         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1158         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1159         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1160         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1161         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)