]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: add remote_advise_corrupt_share, for clients to tell storage servers about...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil, base32
9 from allmydata.storage import BucketWriter, BucketReader, \
10      StorageServer, MutableShareFile, \
11      storage_index_to_dir, DataTooLargeError, LeaseInfo
12 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
13      ReadBucketProxy
14 from allmydata.interfaces import BadWriteEnablerError
15 from allmydata.test.common import LoggingServiceParent
16
17 class Marker:
18     pass
19 class FakeCanary:
20     def __init__(self, ignore_disconnectors=False):
21         self.ignore = ignore_disconnectors
22         self.disconnectors = {}
23     def notifyOnDisconnect(self, f, *args, **kwargs):
24         if self.ignore:
25             return
26         m = Marker()
27         self.disconnectors[m] = (f, args, kwargs)
28         return m
29     def dontNotifyOnDisconnect(self, marker):
30         if self.ignore:
31             return
32         del self.disconnectors[marker]
33
34 class FakeStatsProvider:
35     def count(self, name, delta=1):
36         pass
37     def register_producer(self, producer):
38         pass
39
40 class Bucket(unittest.TestCase):
41     def make_workdir(self, name):
42         basedir = os.path.join("storage", "Bucket", name)
43         incoming = os.path.join(basedir, "tmp", "bucket")
44         final = os.path.join(basedir, "bucket")
45         fileutil.make_dirs(basedir)
46         fileutil.make_dirs(os.path.join(basedir, "tmp"))
47         return incoming, final
48
49     def bucket_writer_closed(self, bw, consumed):
50         pass
51     def add_latency(self, category, latency):
52         pass
53     def count(self, name, delta=1):
54         pass
55
56     def make_lease(self):
57         owner_num = 0
58         renew_secret = os.urandom(32)
59         cancel_secret = os.urandom(32)
60         expiration_time = time.time() + 5000
61         return LeaseInfo(owner_num, renew_secret, cancel_secret,
62                          expiration_time, "\x00" * 20)
63
64     def test_create(self):
65         incoming, final = self.make_workdir("test_create")
66         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
67                           FakeCanary())
68         bw.remote_write(0, "a"*25)
69         bw.remote_write(25, "b"*25)
70         bw.remote_write(50, "c"*25)
71         bw.remote_write(75, "d"*7)
72         bw.remote_close()
73
74     def test_readwrite(self):
75         incoming, final = self.make_workdir("test_readwrite")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*7) # last block may be short
81         bw.remote_close()
82
83         # now read from it
84         br = BucketReader(self, bw.finalhome)
85         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
86         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
87         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
88
89 class RemoteBucket:
90
91     def callRemote(self, methname, *args, **kwargs):
92         def _call():
93             meth = getattr(self.target, "remote_" + methname)
94             return meth(*args, **kwargs)
95         return defer.maybeDeferred(_call)
96
97 class BucketProxy(unittest.TestCase):
98     def make_bucket(self, name, size):
99         basedir = os.path.join("storage", "BucketProxy", name)
100         incoming = os.path.join(basedir, "tmp", "bucket")
101         final = os.path.join(basedir, "bucket")
102         fileutil.make_dirs(basedir)
103         fileutil.make_dirs(os.path.join(basedir, "tmp"))
104         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
105                           FakeCanary())
106         rb = RemoteBucket()
107         rb.target = bw
108         return bw, rb, final
109
110     def make_lease(self):
111         owner_num = 0
112         renew_secret = os.urandom(32)
113         cancel_secret = os.urandom(32)
114         expiration_time = time.time() + 5000
115         return LeaseInfo(owner_num, renew_secret, cancel_secret,
116                          expiration_time, "\x00" * 20)
117
118     def bucket_writer_closed(self, bw, consumed):
119         pass
120     def add_latency(self, category, latency):
121         pass
122     def count(self, name, delta=1):
123         pass
124
125     def test_create(self):
126         bw, rb, sharefname = self.make_bucket("test_create", 500)
127         bp = WriteBucketProxy(rb,
128                               data_size=300,
129                               segment_size=10,
130                               num_segments=5,
131                               num_share_hashes=3,
132                               uri_extension_size=500, nodeid=None)
133         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
134
135     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
136         # Let's pretend each share has 100 bytes of data, and that there are
137         # 4 segments (25 bytes each), and 8 shares total. So the three
138         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
139         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
140         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
141         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
142         # long. That should make the whole share:
143         #
144         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
145         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
146
147         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
148
149         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
150                             for i in range(7)]
151         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
152                             for i in range(7)]
153         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
154                         for i in range(7)]
155         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
156                         for i in (1,9,13)]
157         uri_extension = "s" + "E"*498 + "e"
158
159         bw, rb, sharefname = self.make_bucket(name, sharesize)
160         bp = wbp_class(rb,
161                        data_size=95,
162                        segment_size=25,
163                        num_segments=4,
164                        num_share_hashes=3,
165                        uri_extension_size=len(uri_extension),
166                        nodeid=None)
167
168         d = bp.start()
169         d.addCallback(lambda res: bp.put_block(0, "a"*25))
170         d.addCallback(lambda res: bp.put_block(1, "b"*25))
171         d.addCallback(lambda res: bp.put_block(2, "c"*25))
172         d.addCallback(lambda res: bp.put_block(3, "d"*20))
173         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
174         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
175         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
176         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
177         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
178         d.addCallback(lambda res: bp.close())
179
180         # now read everything back
181         def _start_reading(res):
182             br = BucketReader(self, sharefname)
183             rb = RemoteBucket()
184             rb.target = br
185             rbp = rbp_class(rb, peerid="abc")
186             self.failUnless("to peer" in repr(rbp))
187             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
188
189             d1 = rbp.startIfNecessary()
190             d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
191             d1.addCallback(lambda res: rbp.get_block(0))
192             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
193             d1.addCallback(lambda res: rbp.get_block(1))
194             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
195             d1.addCallback(lambda res: rbp.get_block(2))
196             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
197             d1.addCallback(lambda res: rbp.get_block(3))
198             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
199
200             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
201             d1.addCallback(lambda res:
202                            self.failUnlessEqual(res, plaintext_hashes))
203             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
204             d1.addCallback(lambda res:
205                            self.failUnlessEqual(res, crypttext_hashes))
206             d1.addCallback(lambda res: rbp.get_block_hashes())
207             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
208             d1.addCallback(lambda res: rbp.get_share_hashes())
209             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
210             d1.addCallback(lambda res: rbp.get_uri_extension())
211             d1.addCallback(lambda res:
212                            self.failUnlessEqual(res, uri_extension))
213
214             return d1
215
216         d.addCallback(_start_reading)
217
218         return d
219
220     def test_readwrite_v1(self):
221         return self._do_test_readwrite("test_readwrite_v1",
222                                        0x24, WriteBucketProxy, ReadBucketProxy)
223
224     def test_readwrite_v2(self):
225         return self._do_test_readwrite("test_readwrite_v2",
226                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
227
228 class Server(unittest.TestCase):
229
230     def setUp(self):
231         self.sparent = LoggingServiceParent()
232         self._lease_secret = itertools.count()
233     def tearDown(self):
234         return self.sparent.stopService()
235
236     def workdir(self, name):
237         basedir = os.path.join("storage", "Server", name)
238         return basedir
239
240     def create(self, name, sizelimit=None):
241         workdir = self.workdir(name)
242         ss = StorageServer(workdir, sizelimit,
243                            stats_provider=FakeStatsProvider())
244         ss.setNodeID("\x00" * 20)
245         ss.setServiceParent(self.sparent)
246         return ss
247
248     def test_create(self):
249         ss = self.create("test_create")
250
251     def allocate(self, ss, storage_index, sharenums, size, canary=None):
252         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
253         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
254         if not canary:
255             canary = FakeCanary()
256         return ss.remote_allocate_buckets(storage_index,
257                                           renew_secret, cancel_secret,
258                                           sharenums, size, canary)
259
260     def test_dont_overfill_dirs(self):
261         """
262         This test asserts that if you add a second share whose storage index
263         share lots of leading bits with an extant share (but isn't the exact
264         same storage index), this won't add an entry to the share directory.
265         """
266         ss = self.create("test_dont_overfill_dirs")
267         already, writers = self.allocate(ss, "storageindex", [0], 10)
268         for i, wb in writers.items():
269             wb.remote_write(0, "%10d" % i)
270             wb.remote_close()
271         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
272                                 "shares")
273         children_of_storedir = set(os.listdir(storedir))
274
275         # Now store another one under another storageindex that has leading
276         # chars the same as the first storageindex.
277         already, writers = self.allocate(ss, "storageindey", [0], 10)
278         for i, wb in writers.items():
279             wb.remote_write(0, "%10d" % i)
280             wb.remote_close()
281         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
282                                 "shares")
283         new_children_of_storedir = set(os.listdir(storedir))
284         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
285
286     def test_remove_incoming(self):
287         ss = self.create("test_remove_incoming")
288         already, writers = self.allocate(ss, "vid", range(3), 10)
289         for i,wb in writers.items():
290             wb.remote_write(0, "%10d" % i)
291             wb.remote_close()
292         incoming_share_dir = wb.incominghome
293         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
294         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
295         incoming_dir = os.path.dirname(incoming_prefix_dir)
296         self.failIf(os.path.exists(incoming_bucket_dir))
297         self.failIf(os.path.exists(incoming_prefix_dir))
298         self.failUnless(os.path.exists(incoming_dir))
299
300     def test_allocate(self):
301         ss = self.create("test_allocate")
302
303         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
304
305         canary = FakeCanary()
306         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
307         self.failUnlessEqual(already, set())
308         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
309
310         # while the buckets are open, they should not count as readable
311         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
312
313         # close the buckets
314         for i,wb in writers.items():
315             wb.remote_write(0, "%25d" % i)
316             wb.remote_close()
317             # aborting a bucket that was already closed is a no-op
318             wb.remote_abort()
319
320         # now they should be readable
321         b = ss.remote_get_buckets("allocate")
322         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
323         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
324
325         # now if we ask about writing again, the server should offer those
326         # three buckets as already present. It should offer them even if we
327         # don't ask about those specific ones.
328         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
329         self.failUnlessEqual(already, set([0,1,2]))
330         self.failUnlessEqual(set(writers.keys()), set([3,4]))
331
332         # while those two buckets are open for writing, the server should
333         # refuse to offer them to uploaders
334
335         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
336         self.failUnlessEqual(already2, set([0,1,2]))
337         self.failUnlessEqual(set(writers2.keys()), set([5]))
338
339         # aborting the writes should remove the tempfiles
340         for i,wb in writers2.items():
341             wb.remote_abort()
342         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
343         self.failUnlessEqual(already2, set([0,1,2]))
344         self.failUnlessEqual(set(writers2.keys()), set([5]))
345
346         for i,wb in writers2.items():
347             wb.remote_abort()
348         for i,wb in writers.items():
349             wb.remote_abort()
350
351     def test_disconnect(self):
352         # simulate a disconnection
353         ss = self.create("test_disconnect")
354         canary = FakeCanary()
355         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
356         self.failUnlessEqual(already, set())
357         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
358         for (f,args,kwargs) in canary.disconnectors.values():
359             f(*args, **kwargs)
360         del already
361         del writers
362
363         # that ought to delete the incoming shares
364         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
365         self.failUnlessEqual(already, set())
366         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
367
368     def test_sizelimits(self):
369         ss = self.create("test_sizelimits", 5000)
370         # a newly created and filled share incurs this much overhead, beyond
371         # the size we request.
372         OVERHEAD = 3*4
373         LEASE_SIZE = 4+32+32+4
374         canary = FakeCanary(True)
375         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
376         self.failUnlessEqual(len(writers), 3)
377         # now the StorageServer should have 3000 bytes provisionally
378         # allocated, allowing only 2000 more to be claimed
379         self.failUnlessEqual(len(ss._active_writers), 3)
380
381         # allocating 1001-byte shares only leaves room for one
382         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
383         self.failUnlessEqual(len(writers2), 1)
384         self.failUnlessEqual(len(ss._active_writers), 4)
385
386         # we abandon the first set, so their provisional allocation should be
387         # returned
388         del already
389         del writers
390         self.failUnlessEqual(len(ss._active_writers), 1)
391         # now we have a provisional allocation of 1001 bytes
392
393         # and we close the second set, so their provisional allocation should
394         # become real, long-term allocation, and grows to include the
395         # overhead.
396         for bw in writers2.values():
397             bw.remote_write(0, "a"*25)
398             bw.remote_close()
399         del already2
400         del writers2
401         del bw
402         self.failUnlessEqual(len(ss._active_writers), 0)
403
404         allocated = 1001 + OVERHEAD + LEASE_SIZE
405         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
406         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
407         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
408         self.failUnlessEqual(len(writers3), 39)
409         self.failUnlessEqual(len(ss._active_writers), 39)
410
411         del already3
412         del writers3
413         self.failUnlessEqual(len(ss._active_writers), 0)
414         ss.disownServiceParent()
415         del ss
416
417         # creating a new StorageServer in the same directory should see the
418         # same usage.
419
420         # metadata that goes into the share file is counted upon share close,
421         # as well as at startup. metadata that goes into other files will not
422         # be counted until the next startup, so if we were creating any
423         # extra-file metadata, the allocation would be more than 'allocated'
424         # and this test would need to be changed.
425         ss = self.create("test_sizelimits", 5000)
426         already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
427         self.failUnlessEqual(len(writers4), 39)
428         self.failUnlessEqual(len(ss._active_writers), 39)
429
430     def test_seek(self):
431         basedir = self.workdir("test_seek_behavior")
432         fileutil.make_dirs(basedir)
433         filename = os.path.join(basedir, "testfile")
434         f = open(filename, "wb")
435         f.write("start")
436         f.close()
437         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
438         # files. mode="a" preserves previous contents but does not allow
439         # seeking-to-create-holes. mode="r+" allows both.
440         f = open(filename, "rb+")
441         f.seek(100)
442         f.write("100")
443         f.close()
444         filelen = os.stat(filename)[stat.ST_SIZE]
445         self.failUnlessEqual(filelen, 100+3)
446         f2 = open(filename, "rb")
447         self.failUnlessEqual(f2.read(5), "start")
448
449
450     def test_leases(self):
451         ss = self.create("test_leases")
452         canary = FakeCanary()
453         sharenums = range(5)
454         size = 100
455
456         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
457                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
458         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
459                                                      sharenums, size, canary)
460         self.failUnlessEqual(len(already), 0)
461         self.failUnlessEqual(len(writers), 5)
462         for wb in writers.values():
463             wb.remote_close()
464
465         leases = list(ss.get_leases("si0"))
466         self.failUnlessEqual(len(leases), 1)
467         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
468
469         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
470                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
471         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
472                                                      sharenums, size, canary)
473         for wb in writers.values():
474             wb.remote_close()
475
476         # take out a second lease on si1
477         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
478                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
479         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
480                                                      sharenums, size, canary)
481         self.failUnlessEqual(len(already), 5)
482         self.failUnlessEqual(len(writers), 0)
483
484         leases = list(ss.get_leases("si1"))
485         self.failUnlessEqual(len(leases), 2)
486         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
487
488         # check that si0 is readable
489         readers = ss.remote_get_buckets("si0")
490         self.failUnlessEqual(len(readers), 5)
491
492         # renew the first lease. Only the proper renew_secret should work
493         ss.remote_renew_lease("si0", rs0)
494         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
495         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
496
497         # check that si0 is still readable
498         readers = ss.remote_get_buckets("si0")
499         self.failUnlessEqual(len(readers), 5)
500
501         # now cancel it
502         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
503         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
504         ss.remote_cancel_lease("si0", cs0)
505
506         # si0 should now be gone
507         readers = ss.remote_get_buckets("si0")
508         self.failUnlessEqual(len(readers), 0)
509         # and the renew should no longer work
510         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
511
512
513         # cancel the first lease on si1, leaving the second in place
514         ss.remote_cancel_lease("si1", cs1)
515         readers = ss.remote_get_buckets("si1")
516         self.failUnlessEqual(len(readers), 5)
517         # the corresponding renew should no longer work
518         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
519
520         leases = list(ss.get_leases("si1"))
521         self.failUnlessEqual(len(leases), 1)
522         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
523
524         ss.remote_renew_lease("si1", rs2)
525         # cancelling the second should make it go away
526         ss.remote_cancel_lease("si1", cs2)
527         readers = ss.remote_get_buckets("si1")
528         self.failUnlessEqual(len(readers), 0)
529         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
530         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
531
532         leases = list(ss.get_leases("si1"))
533         self.failUnlessEqual(len(leases), 0)
534
535
536         # test overlapping uploads
537         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
538                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
539         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
540                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
541         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
542                                                      sharenums, size, canary)
543         self.failUnlessEqual(len(already), 0)
544         self.failUnlessEqual(len(writers), 5)
545         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
546                                                        sharenums, size, canary)
547         self.failUnlessEqual(len(already2), 0)
548         self.failUnlessEqual(len(writers2), 0)
549         for wb in writers.values():
550             wb.remote_close()
551
552         leases = list(ss.get_leases("si3"))
553         self.failUnlessEqual(len(leases), 1)
554
555         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
556                                                        sharenums, size, canary)
557         self.failUnlessEqual(len(already3), 5)
558         self.failUnlessEqual(len(writers3), 0)
559
560         leases = list(ss.get_leases("si3"))
561         self.failUnlessEqual(len(leases), 2)
562
563     def test_readonly(self):
564         workdir = self.workdir("test_readonly")
565         ss = StorageServer(workdir, readonly_storage=True)
566         ss.setNodeID("\x00" * 20)
567         ss.setServiceParent(self.sparent)
568
569         canary = FakeCanary()
570         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
571         self.failUnlessEqual(already, set())
572         self.failUnlessEqual(writers, {})
573
574     def test_discard(self):
575         # discard is really only used for other tests, but we test it anyways
576         workdir = self.workdir("test_discard")
577         ss = StorageServer(workdir, discard_storage=True)
578         ss.setNodeID("\x00" * 20)
579         ss.setServiceParent(self.sparent)
580
581         canary = FakeCanary()
582         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
583         self.failUnlessEqual(already, set())
584         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
585         for i,wb in writers.items():
586             wb.remote_write(0, "%25d" % i)
587             wb.remote_close()
588         # since we discard the data, the shares should be present but sparse.
589         # Since we write with some seeks, the data we read back will be all
590         # zeros.
591         b = ss.remote_get_buckets("vid")
592         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
593         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
594
595     def test_advise_corruption(self):
596         workdir = self.workdir("test_advise_corruption")
597         ss = StorageServer(workdir, discard_storage=True)
598         ss.setNodeID("\x00" * 20)
599         ss.setServiceParent(self.sparent)
600
601         si0_s = base32.b2a("si0")
602         ss.remote_advise_corrupt_share("immutable", "si0", 0,
603                                        "This share smells funny.\n")
604         reportdir = os.path.join(workdir, "corruption-advisories")
605         reports = os.listdir(reportdir)
606         self.failUnlessEqual(len(reports), 1)
607         report_si0 = reports[0]
608         self.failUnless(si0_s in report_si0, report_si0)
609         f = open(os.path.join(reportdir, report_si0), "r")
610         report = f.read()
611         f.close()
612         self.failUnless("type: immutable" in report)
613         self.failUnless(("storage_index: %s" % si0_s) in report)
614         self.failUnless("share_number: 0" in report)
615         self.failUnless("This share smells funny." in report)
616
617         # test the RIBucketWriter version too
618         si1_s = base32.b2a("si1")
619         already,writers = self.allocate(ss, "si1", [1], 75)
620         self.failUnlessEqual(already, set())
621         self.failUnlessEqual(set(writers.keys()), set([1]))
622         writers[1].remote_write(0, "data")
623         writers[1].remote_close()
624
625         b = ss.remote_get_buckets("si1")
626         self.failUnlessEqual(set(b.keys()), set([1]))
627         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
628
629         reports = os.listdir(reportdir)
630         self.failUnlessEqual(len(reports), 2)
631         report_si1 = [r for r in reports if si1_s in r][0]
632         f = open(os.path.join(reportdir, report_si1), "r")
633         report = f.read()
634         f.close()
635         self.failUnless("type: immutable" in report)
636         self.failUnless(("storage_index: %s" % si1_s) in report)
637         self.failUnless("share_number: 1" in report)
638         self.failUnless("This share tastes like dust." in report)
639
640
641
642 class MutableServer(unittest.TestCase):
643
644     def setUp(self):
645         self.sparent = LoggingServiceParent()
646         self._lease_secret = itertools.count()
647     def tearDown(self):
648         return self.sparent.stopService()
649
650     def workdir(self, name):
651         basedir = os.path.join("storage", "MutableServer", name)
652         return basedir
653
654     def create(self, name, sizelimit=None):
655         workdir = self.workdir(name)
656         ss = StorageServer(workdir, sizelimit)
657         ss.setServiceParent(self.sparent)
658         ss.setNodeID("\x00" * 20)
659         return ss
660
661     def test_create(self):
662         ss = self.create("test_create")
663
664     def write_enabler(self, we_tag):
665         return hashutil.tagged_hash("we_blah", we_tag)
666
667     def renew_secret(self, tag):
668         return hashutil.tagged_hash("renew_blah", str(tag))
669
670     def cancel_secret(self, tag):
671         return hashutil.tagged_hash("cancel_blah", str(tag))
672
673     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
674         write_enabler = self.write_enabler(we_tag)
675         renew_secret = self.renew_secret(lease_tag)
676         cancel_secret = self.cancel_secret(lease_tag)
677         rstaraw = ss.remote_slot_testv_and_readv_and_writev
678         testandwritev = dict( [ (shnum, ([], [], None) )
679                          for shnum in sharenums ] )
680         readv = []
681         rc = rstaraw(storage_index,
682                      (write_enabler, renew_secret, cancel_secret),
683                      testandwritev,
684                      readv)
685         (did_write, readv_data) = rc
686         self.failUnless(did_write)
687         self.failUnless(isinstance(readv_data, dict))
688         self.failUnlessEqual(len(readv_data), 0)
689
690     def test_container_size(self):
691         ss = self.create("test_container_size")
692         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
693                       set([0,1,2]), 100)
694         rstaraw = ss.remote_slot_testv_and_readv_and_writev
695         secrets = ( self.write_enabler("we1"),
696                     self.renew_secret("we1"),
697                     self.cancel_secret("we1") )
698         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
699         answer = rstaraw("si1", secrets,
700                          {0: ([], [(0,data)], len(data)+12)},
701                          [])
702         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
703
704         # trying to make the container too large will raise an exception
705         TOOBIG = MutableShareFile.MAX_SIZE + 10
706         self.failUnlessRaises(DataTooLargeError,
707                               rstaraw, "si1", secrets,
708                               {0: ([], [(0,data)], TOOBIG)},
709                               [])
710
711         # it should be possible to make the container smaller, although at
712         # the moment this doesn't actually affect the share
713         answer = rstaraw("si1", secrets,
714                          {0: ([], [(0,data)], len(data)+8)},
715                          [])
716         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
717
718     def test_allocate(self):
719         ss = self.create("test_allocate")
720         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
721                       set([0,1,2]), 100)
722
723         read = ss.remote_slot_readv
724         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
725                              {0: [""]})
726         self.failUnlessEqual(read("si1", [], [(0, 10)]),
727                              {0: [""], 1: [""], 2: [""]})
728         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
729                              {0: [""]})
730
731         # try writing to one
732         secrets = ( self.write_enabler("we1"),
733                     self.renew_secret("we1"),
734                     self.cancel_secret("we1") )
735         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
736         write = ss.remote_slot_testv_and_readv_and_writev
737         answer = write("si1", secrets,
738                        {0: ([], [(0,data)], None)},
739                        [])
740         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
741
742         self.failUnlessEqual(read("si1", [0], [(0,20)]),
743                              {0: ["00000000001111111111"]})
744         self.failUnlessEqual(read("si1", [0], [(95,10)]),
745                              {0: ["99999"]})
746         #self.failUnlessEqual(s0.remote_get_length(), 100)
747
748         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
749         f = self.failUnlessRaises(BadWriteEnablerError,
750                                   write, "si1", bad_secrets,
751                                   {}, [])
752         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
753
754         # this testv should fail
755         answer = write("si1", secrets,
756                        {0: ([(0, 12, "eq", "444444444444"),
757                              (20, 5, "eq", "22222"),
758                              ],
759                             [(0, "x"*100)],
760                             None),
761                         },
762                        [(0,12), (20,5)],
763                        )
764         self.failUnlessEqual(answer, (False,
765                                       {0: ["000000000011", "22222"],
766                                        1: ["", ""],
767                                        2: ["", ""],
768                                        }))
769         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
770
771         # as should this one
772         answer = write("si1", secrets,
773                        {0: ([(10, 5, "lt", "11111"),
774                              ],
775                             [(0, "x"*100)],
776                             None),
777                         },
778                        [(10,5)],
779                        )
780         self.failUnlessEqual(answer, (False,
781                                       {0: ["11111"],
782                                        1: [""],
783                                        2: [""]},
784                                       ))
785         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
786
787
788     def test_operators(self):
789         # test operators, the data we're comparing is '11111' in all cases.
790         # test both fail+pass, reset data after each one.
791         ss = self.create("test_operators")
792
793         secrets = ( self.write_enabler("we1"),
794                     self.renew_secret("we1"),
795                     self.cancel_secret("we1") )
796         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
797         write = ss.remote_slot_testv_and_readv_and_writev
798         read = ss.remote_slot_readv
799
800         def reset():
801             write("si1", secrets,
802                   {0: ([], [(0,data)], None)},
803                   [])
804
805         reset()
806
807         #  lt
808         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
809                                              ],
810                                             [(0, "x"*100)],
811                                             None,
812                                             )}, [(10,5)])
813         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
814         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
815         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
816         reset()
817
818         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
819                                              ],
820                                             [(0, "x"*100)],
821                                             None,
822                                             )}, [(10,5)])
823         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
825         reset()
826
827         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
828                                              ],
829                                             [(0, "y"*100)],
830                                             None,
831                                             )}, [(10,5)])
832         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
833         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
834         reset()
835
836         #  le
837         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
838                                              ],
839                                             [(0, "x"*100)],
840                                             None,
841                                             )}, [(10,5)])
842         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
843         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
844         reset()
845
846         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
847                                              ],
848                                             [(0, "y"*100)],
849                                             None,
850                                             )}, [(10,5)])
851         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
852         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
853         reset()
854
855         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
856                                              ],
857                                             [(0, "y"*100)],
858                                             None,
859                                             )}, [(10,5)])
860         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
861         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
862         reset()
863
864         #  eq
865         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
866                                              ],
867                                             [(0, "x"*100)],
868                                             None,
869                                             )}, [(10,5)])
870         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
871         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
872         reset()
873
874         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
875                                              ],
876                                             [(0, "y"*100)],
877                                             None,
878                                             )}, [(10,5)])
879         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
880         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
881         reset()
882
883         #  ne
884         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
885                                              ],
886                                             [(0, "x"*100)],
887                                             None,
888                                             )}, [(10,5)])
889         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
890         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
891         reset()
892
893         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
894                                              ],
895                                             [(0, "y"*100)],
896                                             None,
897                                             )}, [(10,5)])
898         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
899         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
900         reset()
901
902         #  ge
903         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
904                                              ],
905                                             [(0, "y"*100)],
906                                             None,
907                                             )}, [(10,5)])
908         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
909         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
910         reset()
911
912         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
913                                              ],
914                                             [(0, "y"*100)],
915                                             None,
916                                             )}, [(10,5)])
917         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
918         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
919         reset()
920
921         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
922                                              ],
923                                             [(0, "y"*100)],
924                                             None,
925                                             )}, [(10,5)])
926         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
927         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
928         reset()
929
930         #  gt
931         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
932                                              ],
933                                             [(0, "y"*100)],
934                                             None,
935                                             )}, [(10,5)])
936         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
937         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
938         reset()
939
940         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
941                                              ],
942                                             [(0, "x"*100)],
943                                             None,
944                                             )}, [(10,5)])
945         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
946         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
947         reset()
948
949         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
950                                              ],
951                                             [(0, "x"*100)],
952                                             None,
953                                             )}, [(10,5)])
954         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
955         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
956         reset()
957
958         # finally, test some operators against empty shares
959         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
960                                              ],
961                                             [(0, "x"*100)],
962                                             None,
963                                             )}, [(10,5)])
964         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
965         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
966         reset()
967
968     def test_readv(self):
969         ss = self.create("test_readv")
970         secrets = ( self.write_enabler("we1"),
971                     self.renew_secret("we1"),
972                     self.cancel_secret("we1") )
973         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
974         write = ss.remote_slot_testv_and_readv_and_writev
975         read = ss.remote_slot_readv
976         data = [("%d" % i) * 100 for i in range(3)]
977         rc = write("si1", secrets,
978                    {0: ([], [(0,data[0])], None),
979                     1: ([], [(0,data[1])], None),
980                     2: ([], [(0,data[2])], None),
981                     }, [])
982         self.failUnlessEqual(rc, (True, {}))
983
984         answer = read("si1", [], [(0, 10)])
985         self.failUnlessEqual(answer, {0: ["0"*10],
986                                       1: ["1"*10],
987                                       2: ["2"*10]})
988
989     def compare_leases_without_timestamps(self, leases_a, leases_b):
990         self.failUnlessEqual(len(leases_a), len(leases_b))
991         for i in range(len(leases_a)):
992             num_a, a = leases_a[i]
993             num_b, b = leases_b[i]
994             self.failUnlessEqual(num_a, num_b)
995             self.failUnlessEqual(a.owner_num,       b.owner_num)
996             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
997             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
998             self.failUnlessEqual(a.nodeid,          b.nodeid)
999
1000     def compare_leases(self, leases_a, leases_b):
1001         self.failUnlessEqual(len(leases_a), len(leases_b))
1002         for i in range(len(leases_a)):
1003             num_a, a = leases_a[i]
1004             num_b, b = leases_b[i]
1005             self.failUnlessEqual(num_a, num_b)
1006             self.failUnlessEqual(a.owner_num,       b.owner_num)
1007             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1008             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1009             self.failUnlessEqual(a.nodeid,          b.nodeid)
1010             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1011
1012     def test_leases(self):
1013         ss = self.create("test_leases", sizelimit=1000*1000)
1014         def secrets(n):
1015             return ( self.write_enabler("we1"),
1016                      self.renew_secret("we1-%d" % n),
1017                      self.cancel_secret("we1-%d" % n) )
1018         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1019         write = ss.remote_slot_testv_and_readv_and_writev
1020         read = ss.remote_slot_readv
1021         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1022         self.failUnlessEqual(rc, (True, {}))
1023
1024         # create a random non-numeric file in the bucket directory, to
1025         # exercise the code that's supposed to ignore those.
1026         bucket_dir = os.path.join(self.workdir("test_leases"),
1027                                   "shares", storage_index_to_dir("si1"))
1028         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1029         f.write("you ought to be ignoring me\n")
1030         f.close()
1031
1032         # re-allocate the slots and use the same secrets, that should update
1033         # the lease
1034         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1035
1036         # renew it directly
1037         ss.remote_renew_lease("si1", secrets(0)[1])
1038
1039         # now allocate them with a bunch of different secrets, to trigger the
1040         # extended lease code
1041         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1042         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
1043         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1044         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1045         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1046
1047         # cancel one of them
1048         ss.remote_cancel_lease("si1", secrets(5)[2])
1049
1050         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1051         all_leases = s0.debug_get_leases()
1052         self.failUnlessEqual(len(all_leases), 5)
1053
1054         # and write enough data to expand the container, forcing the server
1055         # to move the leases
1056         write("si1", secrets(0),
1057               {0: ([], [(0,data)], 200), },
1058               [])
1059
1060         # read back the leases, make sure they're still intact.
1061         self.compare_leases_without_timestamps(all_leases,
1062                                                s0.debug_get_leases())
1063
1064         ss.remote_renew_lease("si1", secrets(0)[1])
1065         ss.remote_renew_lease("si1", secrets(1)[1])
1066         ss.remote_renew_lease("si1", secrets(2)[1])
1067         ss.remote_renew_lease("si1", secrets(3)[1])
1068         ss.remote_renew_lease("si1", secrets(4)[1])
1069         self.compare_leases_without_timestamps(all_leases,
1070                                                s0.debug_get_leases())
1071         # get a new copy of the leases, with the current timestamps. Reading
1072         # data and failing to renew/cancel leases should leave the timestamps
1073         # alone.
1074         all_leases = s0.debug_get_leases()
1075         # renewing with a bogus token should prompt an error message
1076
1077         # examine the exception thus raised, make sure the old nodeid is
1078         # present, to provide for share migration
1079         e = self.failUnlessRaises(IndexError,
1080                                   ss.remote_renew_lease, "si1",
1081                                   secrets(20)[1])
1082         e_s = str(e)
1083         self.failUnless("Unable to renew non-existent lease" in e_s)
1084         self.failUnless("I have leases accepted by nodeids:" in e_s)
1085         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1086
1087         # same for cancelling
1088         self.failUnlessRaises(IndexError,
1089                               ss.remote_cancel_lease, "si1",
1090                               secrets(20)[2])
1091         self.compare_leases(all_leases, s0.debug_get_leases())
1092
1093         # reading shares should not modify the timestamp
1094         read("si1", [], [(0,200)])
1095         self.compare_leases(all_leases, s0.debug_get_leases())
1096
1097         write("si1", secrets(0),
1098               {0: ([], [(200, "make me bigger")], None)}, [])
1099         self.compare_leases_without_timestamps(all_leases,
1100                                                s0.debug_get_leases())
1101
1102         write("si1", secrets(0),
1103               {0: ([], [(500, "make me really bigger")], None)}, [])
1104         self.compare_leases_without_timestamps(all_leases,
1105                                                s0.debug_get_leases())
1106
1107         # now cancel them all
1108         ss.remote_cancel_lease("si1", secrets(0)[2])
1109         ss.remote_cancel_lease("si1", secrets(1)[2])
1110         ss.remote_cancel_lease("si1", secrets(2)[2])
1111         ss.remote_cancel_lease("si1", secrets(3)[2])
1112
1113         # the slot should still be there
1114         remaining_shares = read("si1", [], [(0,10)])
1115         self.failUnlessEqual(len(remaining_shares), 1)
1116         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1117
1118         # cancelling a non-existent lease should raise an IndexError
1119         self.failUnlessRaises(IndexError,
1120                               ss.remote_cancel_lease, "si1", "nonsecret")
1121
1122         # and the slot should still be there
1123         remaining_shares = read("si1", [], [(0,10)])
1124         self.failUnlessEqual(len(remaining_shares), 1)
1125         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1126
1127         ss.remote_cancel_lease("si1", secrets(4)[2])
1128         # now the slot should be gone
1129         no_shares = read("si1", [], [(0,10)])
1130         self.failUnlessEqual(no_shares, {})
1131
1132         # cancelling a lease on a non-existent share should raise an IndexError
1133         self.failUnlessRaises(IndexError,
1134                               ss.remote_cancel_lease, "si2", "nonsecret")
1135
1136
1137 class Stats(unittest.TestCase):
1138
1139     def setUp(self):
1140         self.sparent = LoggingServiceParent()
1141         self._lease_secret = itertools.count()
1142     def tearDown(self):
1143         return self.sparent.stopService()
1144
1145     def workdir(self, name):
1146         basedir = os.path.join("storage", "Server", name)
1147         return basedir
1148
1149     def create(self, name, sizelimit=None):
1150         workdir = self.workdir(name)
1151         ss = StorageServer(workdir, sizelimit)
1152         ss.setNodeID("\x00" * 20)
1153         ss.setServiceParent(self.sparent)
1154         return ss
1155
1156     def test_latencies(self):
1157         ss = self.create("test_latencies")
1158         for i in range(10000):
1159             ss.add_latency("allocate", 1.0 * i)
1160         for i in range(1000):
1161             ss.add_latency("renew", 1.0 * i)
1162         for i in range(10):
1163             ss.add_latency("cancel", 2.0 * i)
1164         ss.add_latency("get", 5.0)
1165
1166         output = ss.get_latencies()
1167
1168         self.failUnlessEqual(sorted(output.keys()),
1169                              sorted(["allocate", "renew", "cancel", "get"]))
1170         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1171         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1172         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1173         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1174         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1175         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1176         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1177         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1178         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1179
1180         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1181         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1182         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1183         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1184         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1185         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1186         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1187         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1188         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1189
1190         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1191         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1192         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1193         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1194         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1195         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1196         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1197         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1198         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1199
1200         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1201         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1202         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1203         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1204         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1205         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1206         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1207         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1208         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)