]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
immutable: storage servers accept any size shares now
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil, base32
9 from allmydata.storage import BucketWriter, BucketReader, \
10      StorageServer, MutableShareFile, \
11      storage_index_to_dir, DataTooLargeError, LeaseInfo
12 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
13      ReadBucketProxy
14 from allmydata.interfaces import BadWriteEnablerError
15 from allmydata.test.common import LoggingServiceParent
16
17 class Marker:
18     pass
19 class FakeCanary:
20     def __init__(self, ignore_disconnectors=False):
21         self.ignore = ignore_disconnectors
22         self.disconnectors = {}
23     def notifyOnDisconnect(self, f, *args, **kwargs):
24         if self.ignore:
25             return
26         m = Marker()
27         self.disconnectors[m] = (f, args, kwargs)
28         return m
29     def dontNotifyOnDisconnect(self, marker):
30         if self.ignore:
31             return
32         del self.disconnectors[marker]
33
34 class FakeStatsProvider:
35     def count(self, name, delta=1):
36         pass
37     def register_producer(self, producer):
38         pass
39
40 class Bucket(unittest.TestCase):
41     def make_workdir(self, name):
42         basedir = os.path.join("storage", "Bucket", name)
43         incoming = os.path.join(basedir, "tmp", "bucket")
44         final = os.path.join(basedir, "bucket")
45         fileutil.make_dirs(basedir)
46         fileutil.make_dirs(os.path.join(basedir, "tmp"))
47         return incoming, final
48
49     def bucket_writer_closed(self, bw, consumed):
50         pass
51     def add_latency(self, category, latency):
52         pass
53     def count(self, name, delta=1):
54         pass
55
56     def make_lease(self):
57         owner_num = 0
58         renew_secret = os.urandom(32)
59         cancel_secret = os.urandom(32)
60         expiration_time = time.time() + 5000
61         return LeaseInfo(owner_num, renew_secret, cancel_secret,
62                          expiration_time, "\x00" * 20)
63
64     def test_create(self):
65         incoming, final = self.make_workdir("test_create")
66         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
67                           FakeCanary())
68         bw.remote_write(0, "a"*25)
69         bw.remote_write(25, "b"*25)
70         bw.remote_write(50, "c"*25)
71         bw.remote_write(75, "d"*7)
72         bw.remote_close()
73
74     def test_readwrite(self):
75         incoming, final = self.make_workdir("test_readwrite")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*7) # last block may be short
81         bw.remote_close()
82
83         # now read from it
84         br = BucketReader(self, bw.finalhome)
85         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
86         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
87         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
88
89 class RemoteBucket:
90
91     def callRemote(self, methname, *args, **kwargs):
92         def _call():
93             meth = getattr(self.target, "remote_" + methname)
94             return meth(*args, **kwargs)
95         return defer.maybeDeferred(_call)
96
97 class BucketProxy(unittest.TestCase):
98     def make_bucket(self, name, size):
99         basedir = os.path.join("storage", "BucketProxy", name)
100         incoming = os.path.join(basedir, "tmp", "bucket")
101         final = os.path.join(basedir, "bucket")
102         fileutil.make_dirs(basedir)
103         fileutil.make_dirs(os.path.join(basedir, "tmp"))
104         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
105                           FakeCanary())
106         rb = RemoteBucket()
107         rb.target = bw
108         return bw, rb, final
109
110     def make_lease(self):
111         owner_num = 0
112         renew_secret = os.urandom(32)
113         cancel_secret = os.urandom(32)
114         expiration_time = time.time() + 5000
115         return LeaseInfo(owner_num, renew_secret, cancel_secret,
116                          expiration_time, "\x00" * 20)
117
118     def bucket_writer_closed(self, bw, consumed):
119         pass
120     def add_latency(self, category, latency):
121         pass
122     def count(self, name, delta=1):
123         pass
124
125     def test_create(self):
126         bw, rb, sharefname = self.make_bucket("test_create", 500)
127         bp = WriteBucketProxy(rb,
128                               data_size=300,
129                               segment_size=10,
130                               num_segments=5,
131                               num_share_hashes=3,
132                               uri_extension_size=500, nodeid=None)
133         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
134
135     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
136         # Let's pretend each share has 100 bytes of data, and that there are
137         # 4 segments (25 bytes each), and 8 shares total. So the two
138         # per-segment merkle trees (crypttext_hash_tree,
139         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
140         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
141         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
142         # long. That should make the whole share:
143         #
144         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
145         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
146
147         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
148
149         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
150                             for i in range(7)]
151         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
152                         for i in range(7)]
153         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
154                         for i in (1,9,13)]
155         uri_extension = "s" + "E"*498 + "e"
156
157         bw, rb, sharefname = self.make_bucket(name, sharesize)
158         bp = wbp_class(rb,
159                        data_size=95,
160                        segment_size=25,
161                        num_segments=4,
162                        num_share_hashes=3,
163                        uri_extension_size=len(uri_extension),
164                        nodeid=None)
165
166         d = bp.start()
167         d.addCallback(lambda res: bp.put_block(0, "a"*25))
168         d.addCallback(lambda res: bp.put_block(1, "b"*25))
169         d.addCallback(lambda res: bp.put_block(2, "c"*25))
170         d.addCallback(lambda res: bp.put_block(3, "d"*20))
171         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
172         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
173         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
174         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
175         d.addCallback(lambda res: bp.close())
176
177         # now read everything back
178         def _start_reading(res):
179             br = BucketReader(self, sharefname)
180             rb = RemoteBucket()
181             rb.target = br
182             rbp = rbp_class(rb, peerid="abc", storage_index="")
183             self.failUnless("to peer" in repr(rbp))
184             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
185
186             d1 = rbp.startIfNecessary()
187             d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
188             d1.addCallback(lambda res: rbp.get_block(0))
189             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
190             d1.addCallback(lambda res: rbp.get_block(1))
191             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
192             d1.addCallback(lambda res: rbp.get_block(2))
193             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
194             d1.addCallback(lambda res: rbp.get_block(3))
195             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
196
197             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
198             d1.addCallback(lambda res:
199                            self.failUnlessEqual(res, crypttext_hashes))
200             d1.addCallback(lambda res: rbp.get_block_hashes())
201             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
202             d1.addCallback(lambda res: rbp.get_share_hashes())
203             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
204             d1.addCallback(lambda res: rbp.get_uri_extension())
205             d1.addCallback(lambda res:
206                            self.failUnlessEqual(res, uri_extension))
207
208             return d1
209
210         d.addCallback(_start_reading)
211
212         return d
213
214     def test_readwrite_v1(self):
215         return self._do_test_readwrite("test_readwrite_v1",
216                                        0x24, WriteBucketProxy, ReadBucketProxy)
217
218     def test_readwrite_v2(self):
219         return self._do_test_readwrite("test_readwrite_v2",
220                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
221
222 class FakeDiskStorageServer(StorageServer):
223     def stat_disk(self, d):
224         return self.DISKAVAIL
225
226 class Server(unittest.TestCase):
227
228     def setUp(self):
229         self.sparent = LoggingServiceParent()
230         self._lease_secret = itertools.count()
231     def tearDown(self):
232         return self.sparent.stopService()
233
234     def workdir(self, name):
235         basedir = os.path.join("storage", "Server", name)
236         return basedir
237
238     def create(self, name, reserved_space=0, klass=StorageServer):
239         workdir = self.workdir(name)
240         ss = klass(workdir, reserved_space=reserved_space,
241                    stats_provider=FakeStatsProvider())
242         ss.setNodeID("\x00" * 20)
243         ss.setServiceParent(self.sparent)
244         return ss
245
246     def test_create(self):
247         ss = self.create("test_create")
248
249     def allocate(self, ss, storage_index, sharenums, size, canary=None):
250         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
251         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
252         if not canary:
253             canary = FakeCanary()
254         return ss.remote_allocate_buckets(storage_index,
255                                           renew_secret, cancel_secret,
256                                           sharenums, size, canary)
257
258     def test_large_share(self):
259         ss = self.create("test_large_share")
260
261         already,writers = self.allocate(ss, "allocate", [0,1,2], 2**32+2)
262         self.failUnlessEqual(already, set())
263         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
264
265         shnum, bucket = writers.items()[0]
266         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
267         bucket.remote_write(2**32, "ab")
268         bucket.remote_close()
269
270         readers = ss.remote_get_buckets("allocate")
271         reader = readers[shnum]
272         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
273
274     def test_dont_overfill_dirs(self):
275         """
276         This test asserts that if you add a second share whose storage index
277         share lots of leading bits with an extant share (but isn't the exact
278         same storage index), this won't add an entry to the share directory.
279         """
280         ss = self.create("test_dont_overfill_dirs")
281         already, writers = self.allocate(ss, "storageindex", [0], 10)
282         for i, wb in writers.items():
283             wb.remote_write(0, "%10d" % i)
284             wb.remote_close()
285         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
286                                 "shares")
287         children_of_storedir = set(os.listdir(storedir))
288
289         # Now store another one under another storageindex that has leading
290         # chars the same as the first storageindex.
291         already, writers = self.allocate(ss, "storageindey", [0], 10)
292         for i, wb in writers.items():
293             wb.remote_write(0, "%10d" % i)
294             wb.remote_close()
295         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296                                 "shares")
297         new_children_of_storedir = set(os.listdir(storedir))
298         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
299
300     def test_remove_incoming(self):
301         ss = self.create("test_remove_incoming")
302         already, writers = self.allocate(ss, "vid", range(3), 10)
303         for i,wb in writers.items():
304             wb.remote_write(0, "%10d" % i)
305             wb.remote_close()
306         incoming_share_dir = wb.incominghome
307         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
308         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
309         incoming_dir = os.path.dirname(incoming_prefix_dir)
310         self.failIf(os.path.exists(incoming_bucket_dir))
311         self.failIf(os.path.exists(incoming_prefix_dir))
312         self.failUnless(os.path.exists(incoming_dir))
313
314     def test_allocate(self):
315         ss = self.create("test_allocate")
316
317         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
318
319         canary = FakeCanary()
320         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
321         self.failUnlessEqual(already, set())
322         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
323
324         # while the buckets are open, they should not count as readable
325         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
326
327         # close the buckets
328         for i,wb in writers.items():
329             wb.remote_write(0, "%25d" % i)
330             wb.remote_close()
331             # aborting a bucket that was already closed is a no-op
332             wb.remote_abort()
333
334         # now they should be readable
335         b = ss.remote_get_buckets("allocate")
336         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
337         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
338
339         # now if we ask about writing again, the server should offer those
340         # three buckets as already present. It should offer them even if we
341         # don't ask about those specific ones.
342         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
343         self.failUnlessEqual(already, set([0,1,2]))
344         self.failUnlessEqual(set(writers.keys()), set([3,4]))
345
346         # while those two buckets are open for writing, the server should
347         # refuse to offer them to uploaders
348
349         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
350         self.failUnlessEqual(already2, set([0,1,2]))
351         self.failUnlessEqual(set(writers2.keys()), set([5]))
352
353         # aborting the writes should remove the tempfiles
354         for i,wb in writers2.items():
355             wb.remote_abort()
356         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
357         self.failUnlessEqual(already2, set([0,1,2]))
358         self.failUnlessEqual(set(writers2.keys()), set([5]))
359
360         for i,wb in writers2.items():
361             wb.remote_abort()
362         for i,wb in writers.items():
363             wb.remote_abort()
364
365     def test_disconnect(self):
366         # simulate a disconnection
367         ss = self.create("test_disconnect")
368         canary = FakeCanary()
369         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
370         self.failUnlessEqual(already, set())
371         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
372         for (f,args,kwargs) in canary.disconnectors.values():
373             f(*args, **kwargs)
374         del already
375         del writers
376
377         # that ought to delete the incoming shares
378         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
379         self.failUnlessEqual(already, set())
380         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
381
382     def test_reserved_space(self):
383         ss = self.create("test_reserved_space", reserved_space=10000,
384                          klass=FakeDiskStorageServer)
385         # the FakeDiskStorageServer doesn't do real statvfs() calls
386         ss.DISKAVAIL = 15000
387         # 15k available, 10k reserved, leaves 5k for shares
388
389         # a newly created and filled share incurs this much overhead, beyond
390         # the size we request.
391         OVERHEAD = 3*4
392         LEASE_SIZE = 4+32+32+4
393         canary = FakeCanary(True)
394         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
395         self.failUnlessEqual(len(writers), 3)
396         # now the StorageServer should have 3000 bytes provisionally
397         # allocated, allowing only 2000 more to be claimed
398         self.failUnlessEqual(len(ss._active_writers), 3)
399
400         # allocating 1001-byte shares only leaves room for one
401         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
402         self.failUnlessEqual(len(writers2), 1)
403         self.failUnlessEqual(len(ss._active_writers), 4)
404
405         # we abandon the first set, so their provisional allocation should be
406         # returned
407         del already
408         del writers
409         self.failUnlessEqual(len(ss._active_writers), 1)
410         # now we have a provisional allocation of 1001 bytes
411
412         # and we close the second set, so their provisional allocation should
413         # become real, long-term allocation, and grows to include the
414         # overhead.
415         for bw in writers2.values():
416             bw.remote_write(0, "a"*25)
417             bw.remote_close()
418         del already2
419         del writers2
420         del bw
421         self.failUnlessEqual(len(ss._active_writers), 0)
422
423         allocated = 1001 + OVERHEAD + LEASE_SIZE
424
425         # we have to manually increase DISKAVAIL, since we're not doing real
426         # disk measurements
427         ss.DISKAVAIL -= allocated
428
429         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
430         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
431         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
432         self.failUnlessEqual(len(writers3), 39)
433         self.failUnlessEqual(len(ss._active_writers), 39)
434
435         del already3
436         del writers3
437         self.failUnlessEqual(len(ss._active_writers), 0)
438         ss.disownServiceParent()
439         del ss
440
441     def test_seek(self):
442         basedir = self.workdir("test_seek_behavior")
443         fileutil.make_dirs(basedir)
444         filename = os.path.join(basedir, "testfile")
445         f = open(filename, "wb")
446         f.write("start")
447         f.close()
448         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
449         # files. mode="a" preserves previous contents but does not allow
450         # seeking-to-create-holes. mode="r+" allows both.
451         f = open(filename, "rb+")
452         f.seek(100)
453         f.write("100")
454         f.close()
455         filelen = os.stat(filename)[stat.ST_SIZE]
456         self.failUnlessEqual(filelen, 100+3)
457         f2 = open(filename, "rb")
458         self.failUnlessEqual(f2.read(5), "start")
459
460
461     def test_leases(self):
462         ss = self.create("test_leases")
463         canary = FakeCanary()
464         sharenums = range(5)
465         size = 100
466
467         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
468                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
469         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
470                                                      sharenums, size, canary)
471         self.failUnlessEqual(len(already), 0)
472         self.failUnlessEqual(len(writers), 5)
473         for wb in writers.values():
474             wb.remote_close()
475
476         leases = list(ss.get_leases("si0"))
477         self.failUnlessEqual(len(leases), 1)
478         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
479
480         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
481                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
482         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
483                                                      sharenums, size, canary)
484         for wb in writers.values():
485             wb.remote_close()
486
487         # take out a second lease on si1
488         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
489                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
490         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
491                                                      sharenums, size, canary)
492         self.failUnlessEqual(len(already), 5)
493         self.failUnlessEqual(len(writers), 0)
494
495         leases = list(ss.get_leases("si1"))
496         self.failUnlessEqual(len(leases), 2)
497         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
498
499         # check that si0 is readable
500         readers = ss.remote_get_buckets("si0")
501         self.failUnlessEqual(len(readers), 5)
502
503         # renew the first lease. Only the proper renew_secret should work
504         ss.remote_renew_lease("si0", rs0)
505         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
506         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
507
508         # check that si0 is still readable
509         readers = ss.remote_get_buckets("si0")
510         self.failUnlessEqual(len(readers), 5)
511
512         # now cancel it
513         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
514         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
515         ss.remote_cancel_lease("si0", cs0)
516
517         # si0 should now be gone
518         readers = ss.remote_get_buckets("si0")
519         self.failUnlessEqual(len(readers), 0)
520         # and the renew should no longer work
521         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
522
523
524         # cancel the first lease on si1, leaving the second in place
525         ss.remote_cancel_lease("si1", cs1)
526         readers = ss.remote_get_buckets("si1")
527         self.failUnlessEqual(len(readers), 5)
528         # the corresponding renew should no longer work
529         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
530
531         leases = list(ss.get_leases("si1"))
532         self.failUnlessEqual(len(leases), 1)
533         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
534
535         ss.remote_renew_lease("si1", rs2)
536         # cancelling the second should make it go away
537         ss.remote_cancel_lease("si1", cs2)
538         readers = ss.remote_get_buckets("si1")
539         self.failUnlessEqual(len(readers), 0)
540         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
541         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
542
543         leases = list(ss.get_leases("si1"))
544         self.failUnlessEqual(len(leases), 0)
545
546
547         # test overlapping uploads
548         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
549                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
550         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
551                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
552         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
553                                                      sharenums, size, canary)
554         self.failUnlessEqual(len(already), 0)
555         self.failUnlessEqual(len(writers), 5)
556         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
557                                                        sharenums, size, canary)
558         self.failUnlessEqual(len(already2), 0)
559         self.failUnlessEqual(len(writers2), 0)
560         for wb in writers.values():
561             wb.remote_close()
562
563         leases = list(ss.get_leases("si3"))
564         self.failUnlessEqual(len(leases), 1)
565
566         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
567                                                        sharenums, size, canary)
568         self.failUnlessEqual(len(already3), 5)
569         self.failUnlessEqual(len(writers3), 0)
570
571         leases = list(ss.get_leases("si3"))
572         self.failUnlessEqual(len(leases), 2)
573
574     def test_readonly(self):
575         workdir = self.workdir("test_readonly")
576         ss = StorageServer(workdir, readonly_storage=True)
577         ss.setNodeID("\x00" * 20)
578         ss.setServiceParent(self.sparent)
579
580         canary = FakeCanary()
581         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
582         self.failUnlessEqual(already, set())
583         self.failUnlessEqual(writers, {})
584
585         stats = ss.get_stats()
586         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
587                              False)
588         if "storage_server.disk_avail" in stats:
589             # windows does not have os.statvfs, so it doesn't give us disk
590             # stats. But if there are stats, readonly_storage means
591             # disk_avail=0
592             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
593
594     def test_discard(self):
595         # discard is really only used for other tests, but we test it anyways
596         workdir = self.workdir("test_discard")
597         ss = StorageServer(workdir, discard_storage=True)
598         ss.setNodeID("\x00" * 20)
599         ss.setServiceParent(self.sparent)
600
601         canary = FakeCanary()
602         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
603         self.failUnlessEqual(already, set())
604         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
605         for i,wb in writers.items():
606             wb.remote_write(0, "%25d" % i)
607             wb.remote_close()
608         # since we discard the data, the shares should be present but sparse.
609         # Since we write with some seeks, the data we read back will be all
610         # zeros.
611         b = ss.remote_get_buckets("vid")
612         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
613         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
614
615     def test_advise_corruption(self):
616         workdir = self.workdir("test_advise_corruption")
617         ss = StorageServer(workdir, discard_storage=True)
618         ss.setNodeID("\x00" * 20)
619         ss.setServiceParent(self.sparent)
620
621         si0_s = base32.b2a("si0")
622         ss.remote_advise_corrupt_share("immutable", "si0", 0,
623                                        "This share smells funny.\n")
624         reportdir = os.path.join(workdir, "corruption-advisories")
625         reports = os.listdir(reportdir)
626         self.failUnlessEqual(len(reports), 1)
627         report_si0 = reports[0]
628         self.failUnless(si0_s in report_si0, report_si0)
629         f = open(os.path.join(reportdir, report_si0), "r")
630         report = f.read()
631         f.close()
632         self.failUnless("type: immutable" in report)
633         self.failUnless(("storage_index: %s" % si0_s) in report)
634         self.failUnless("share_number: 0" in report)
635         self.failUnless("This share smells funny." in report)
636
637         # test the RIBucketWriter version too
638         si1_s = base32.b2a("si1")
639         already,writers = self.allocate(ss, "si1", [1], 75)
640         self.failUnlessEqual(already, set())
641         self.failUnlessEqual(set(writers.keys()), set([1]))
642         writers[1].remote_write(0, "data")
643         writers[1].remote_close()
644
645         b = ss.remote_get_buckets("si1")
646         self.failUnlessEqual(set(b.keys()), set([1]))
647         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
648
649         reports = os.listdir(reportdir)
650         self.failUnlessEqual(len(reports), 2)
651         report_si1 = [r for r in reports if si1_s in r][0]
652         f = open(os.path.join(reportdir, report_si1), "r")
653         report = f.read()
654         f.close()
655         self.failUnless("type: immutable" in report)
656         self.failUnless(("storage_index: %s" % si1_s) in report)
657         self.failUnless("share_number: 1" in report)
658         self.failUnless("This share tastes like dust." in report)
659
660
661
662 class MutableServer(unittest.TestCase):
663
664     def setUp(self):
665         self.sparent = LoggingServiceParent()
666         self._lease_secret = itertools.count()
667     def tearDown(self):
668         return self.sparent.stopService()
669
670     def workdir(self, name):
671         basedir = os.path.join("storage", "MutableServer", name)
672         return basedir
673
674     def create(self, name):
675         workdir = self.workdir(name)
676         ss = StorageServer(workdir)
677         ss.setServiceParent(self.sparent)
678         ss.setNodeID("\x00" * 20)
679         return ss
680
681     def test_create(self):
682         ss = self.create("test_create")
683
684     def write_enabler(self, we_tag):
685         return hashutil.tagged_hash("we_blah", we_tag)
686
687     def renew_secret(self, tag):
688         return hashutil.tagged_hash("renew_blah", str(tag))
689
690     def cancel_secret(self, tag):
691         return hashutil.tagged_hash("cancel_blah", str(tag))
692
693     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
694         write_enabler = self.write_enabler(we_tag)
695         renew_secret = self.renew_secret(lease_tag)
696         cancel_secret = self.cancel_secret(lease_tag)
697         rstaraw = ss.remote_slot_testv_and_readv_and_writev
698         testandwritev = dict( [ (shnum, ([], [], None) )
699                          for shnum in sharenums ] )
700         readv = []
701         rc = rstaraw(storage_index,
702                      (write_enabler, renew_secret, cancel_secret),
703                      testandwritev,
704                      readv)
705         (did_write, readv_data) = rc
706         self.failUnless(did_write)
707         self.failUnless(isinstance(readv_data, dict))
708         self.failUnlessEqual(len(readv_data), 0)
709
710     def test_container_size(self):
711         ss = self.create("test_container_size")
712         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
713                       set([0,1,2]), 100)
714         rstaraw = ss.remote_slot_testv_and_readv_and_writev
715         secrets = ( self.write_enabler("we1"),
716                     self.renew_secret("we1"),
717                     self.cancel_secret("we1") )
718         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
719         answer = rstaraw("si1", secrets,
720                          {0: ([], [(0,data)], len(data)+12)},
721                          [])
722         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
723
724         # trying to make the container too large will raise an exception
725         TOOBIG = MutableShareFile.MAX_SIZE + 10
726         self.failUnlessRaises(DataTooLargeError,
727                               rstaraw, "si1", secrets,
728                               {0: ([], [(0,data)], TOOBIG)},
729                               [])
730
731         # it should be possible to make the container smaller, although at
732         # the moment this doesn't actually affect the share
733         answer = rstaraw("si1", secrets,
734                          {0: ([], [(0,data)], len(data)+8)},
735                          [])
736         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
737
738     def test_allocate(self):
739         ss = self.create("test_allocate")
740         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
741                       set([0,1,2]), 100)
742
743         read = ss.remote_slot_readv
744         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
745                              {0: [""]})
746         self.failUnlessEqual(read("si1", [], [(0, 10)]),
747                              {0: [""], 1: [""], 2: [""]})
748         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
749                              {0: [""]})
750
751         # try writing to one
752         secrets = ( self.write_enabler("we1"),
753                     self.renew_secret("we1"),
754                     self.cancel_secret("we1") )
755         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
756         write = ss.remote_slot_testv_and_readv_and_writev
757         answer = write("si1", secrets,
758                        {0: ([], [(0,data)], None)},
759                        [])
760         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
761
762         self.failUnlessEqual(read("si1", [0], [(0,20)]),
763                              {0: ["00000000001111111111"]})
764         self.failUnlessEqual(read("si1", [0], [(95,10)]),
765                              {0: ["99999"]})
766         #self.failUnlessEqual(s0.remote_get_length(), 100)
767
768         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
769         f = self.failUnlessRaises(BadWriteEnablerError,
770                                   write, "si1", bad_secrets,
771                                   {}, [])
772         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
773
774         # this testv should fail
775         answer = write("si1", secrets,
776                        {0: ([(0, 12, "eq", "444444444444"),
777                              (20, 5, "eq", "22222"),
778                              ],
779                             [(0, "x"*100)],
780                             None),
781                         },
782                        [(0,12), (20,5)],
783                        )
784         self.failUnlessEqual(answer, (False,
785                                       {0: ["000000000011", "22222"],
786                                        1: ["", ""],
787                                        2: ["", ""],
788                                        }))
789         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
790
791         # as should this one
792         answer = write("si1", secrets,
793                        {0: ([(10, 5, "lt", "11111"),
794                              ],
795                             [(0, "x"*100)],
796                             None),
797                         },
798                        [(10,5)],
799                        )
800         self.failUnlessEqual(answer, (False,
801                                       {0: ["11111"],
802                                        1: [""],
803                                        2: [""]},
804                                       ))
805         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
806
807
808     def test_operators(self):
809         # test operators, the data we're comparing is '11111' in all cases.
810         # test both fail+pass, reset data after each one.
811         ss = self.create("test_operators")
812
813         secrets = ( self.write_enabler("we1"),
814                     self.renew_secret("we1"),
815                     self.cancel_secret("we1") )
816         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
817         write = ss.remote_slot_testv_and_readv_and_writev
818         read = ss.remote_slot_readv
819
820         def reset():
821             write("si1", secrets,
822                   {0: ([], [(0,data)], None)},
823                   [])
824
825         reset()
826
827         #  lt
828         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
829                                              ],
830                                             [(0, "x"*100)],
831                                             None,
832                                             )}, [(10,5)])
833         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
834         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
835         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
836         reset()
837
838         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
839                                              ],
840                                             [(0, "x"*100)],
841                                             None,
842                                             )}, [(10,5)])
843         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
844         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
845         reset()
846
847         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
848                                              ],
849                                             [(0, "y"*100)],
850                                             None,
851                                             )}, [(10,5)])
852         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
853         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
854         reset()
855
856         #  le
857         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
858                                              ],
859                                             [(0, "x"*100)],
860                                             None,
861                                             )}, [(10,5)])
862         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
863         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
864         reset()
865
866         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
867                                              ],
868                                             [(0, "y"*100)],
869                                             None,
870                                             )}, [(10,5)])
871         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
872         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
873         reset()
874
875         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
876                                              ],
877                                             [(0, "y"*100)],
878                                             None,
879                                             )}, [(10,5)])
880         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
881         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
882         reset()
883
884         #  eq
885         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
886                                              ],
887                                             [(0, "x"*100)],
888                                             None,
889                                             )}, [(10,5)])
890         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
891         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
892         reset()
893
894         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
895                                              ],
896                                             [(0, "y"*100)],
897                                             None,
898                                             )}, [(10,5)])
899         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
900         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
901         reset()
902
903         #  ne
904         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
905                                              ],
906                                             [(0, "x"*100)],
907                                             None,
908                                             )}, [(10,5)])
909         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
910         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
911         reset()
912
913         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
914                                              ],
915                                             [(0, "y"*100)],
916                                             None,
917                                             )}, [(10,5)])
918         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
919         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
920         reset()
921
922         #  ge
923         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
924                                              ],
925                                             [(0, "y"*100)],
926                                             None,
927                                             )}, [(10,5)])
928         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
929         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
930         reset()
931
932         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
933                                              ],
934                                             [(0, "y"*100)],
935                                             None,
936                                             )}, [(10,5)])
937         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
938         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
939         reset()
940
941         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
942                                              ],
943                                             [(0, "y"*100)],
944                                             None,
945                                             )}, [(10,5)])
946         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
947         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
948         reset()
949
950         #  gt
951         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
952                                              ],
953                                             [(0, "y"*100)],
954                                             None,
955                                             )}, [(10,5)])
956         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
957         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
958         reset()
959
960         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
961                                              ],
962                                             [(0, "x"*100)],
963                                             None,
964                                             )}, [(10,5)])
965         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
966         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
967         reset()
968
969         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
970                                              ],
971                                             [(0, "x"*100)],
972                                             None,
973                                             )}, [(10,5)])
974         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
976         reset()
977
978         # finally, test some operators against empty shares
979         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
980                                              ],
981                                             [(0, "x"*100)],
982                                             None,
983                                             )}, [(10,5)])
984         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
985         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
986         reset()
987
988     def test_readv(self):
989         ss = self.create("test_readv")
990         secrets = ( self.write_enabler("we1"),
991                     self.renew_secret("we1"),
992                     self.cancel_secret("we1") )
993         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
994         write = ss.remote_slot_testv_and_readv_and_writev
995         read = ss.remote_slot_readv
996         data = [("%d" % i) * 100 for i in range(3)]
997         rc = write("si1", secrets,
998                    {0: ([], [(0,data[0])], None),
999                     1: ([], [(0,data[1])], None),
1000                     2: ([], [(0,data[2])], None),
1001                     }, [])
1002         self.failUnlessEqual(rc, (True, {}))
1003
1004         answer = read("si1", [], [(0, 10)])
1005         self.failUnlessEqual(answer, {0: ["0"*10],
1006                                       1: ["1"*10],
1007                                       2: ["2"*10]})
1008
1009     def compare_leases_without_timestamps(self, leases_a, leases_b):
1010         self.failUnlessEqual(len(leases_a), len(leases_b))
1011         for i in range(len(leases_a)):
1012             num_a, a = leases_a[i]
1013             num_b, b = leases_b[i]
1014             self.failUnlessEqual(num_a, num_b)
1015             self.failUnlessEqual(a.owner_num,       b.owner_num)
1016             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1017             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1018             self.failUnlessEqual(a.nodeid,          b.nodeid)
1019
1020     def compare_leases(self, leases_a, leases_b):
1021         self.failUnlessEqual(len(leases_a), len(leases_b))
1022         for i in range(len(leases_a)):
1023             num_a, a = leases_a[i]
1024             num_b, b = leases_b[i]
1025             self.failUnlessEqual(num_a, num_b)
1026             self.failUnlessEqual(a.owner_num,       b.owner_num)
1027             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1028             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1029             self.failUnlessEqual(a.nodeid,          b.nodeid)
1030             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1031
1032     def test_leases(self):
1033         ss = self.create("test_leases")
1034         def secrets(n):
1035             return ( self.write_enabler("we1"),
1036                      self.renew_secret("we1-%d" % n),
1037                      self.cancel_secret("we1-%d" % n) )
1038         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1039         write = ss.remote_slot_testv_and_readv_and_writev
1040         read = ss.remote_slot_readv
1041         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1042         self.failUnlessEqual(rc, (True, {}))
1043
1044         # create a random non-numeric file in the bucket directory, to
1045         # exercise the code that's supposed to ignore those.
1046         bucket_dir = os.path.join(self.workdir("test_leases"),
1047                                   "shares", storage_index_to_dir("si1"))
1048         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1049         f.write("you ought to be ignoring me\n")
1050         f.close()
1051
1052         # re-allocate the slots and use the same secrets, that should update
1053         # the lease
1054         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1055
1056         # renew it directly
1057         ss.remote_renew_lease("si1", secrets(0)[1])
1058
1059         # now allocate them with a bunch of different secrets, to trigger the
1060         # extended lease code
1061         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1062         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
1063         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1064         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1065         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1066
1067         # cancel one of them
1068         ss.remote_cancel_lease("si1", secrets(5)[2])
1069
1070         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1071         all_leases = s0.debug_get_leases()
1072         self.failUnlessEqual(len(all_leases), 5)
1073
1074         # and write enough data to expand the container, forcing the server
1075         # to move the leases
1076         write("si1", secrets(0),
1077               {0: ([], [(0,data)], 200), },
1078               [])
1079
1080         # read back the leases, make sure they're still intact.
1081         self.compare_leases_without_timestamps(all_leases,
1082                                                s0.debug_get_leases())
1083
1084         ss.remote_renew_lease("si1", secrets(0)[1])
1085         ss.remote_renew_lease("si1", secrets(1)[1])
1086         ss.remote_renew_lease("si1", secrets(2)[1])
1087         ss.remote_renew_lease("si1", secrets(3)[1])
1088         ss.remote_renew_lease("si1", secrets(4)[1])
1089         self.compare_leases_without_timestamps(all_leases,
1090                                                s0.debug_get_leases())
1091         # get a new copy of the leases, with the current timestamps. Reading
1092         # data and failing to renew/cancel leases should leave the timestamps
1093         # alone.
1094         all_leases = s0.debug_get_leases()
1095         # renewing with a bogus token should prompt an error message
1096
1097         # examine the exception thus raised, make sure the old nodeid is
1098         # present, to provide for share migration
1099         e = self.failUnlessRaises(IndexError,
1100                                   ss.remote_renew_lease, "si1",
1101                                   secrets(20)[1])
1102         e_s = str(e)
1103         self.failUnless("Unable to renew non-existent lease" in e_s)
1104         self.failUnless("I have leases accepted by nodeids:" in e_s)
1105         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1106
1107         # same for cancelling
1108         self.failUnlessRaises(IndexError,
1109                               ss.remote_cancel_lease, "si1",
1110                               secrets(20)[2])
1111         self.compare_leases(all_leases, s0.debug_get_leases())
1112
1113         # reading shares should not modify the timestamp
1114         read("si1", [], [(0,200)])
1115         self.compare_leases(all_leases, s0.debug_get_leases())
1116
1117         write("si1", secrets(0),
1118               {0: ([], [(200, "make me bigger")], None)}, [])
1119         self.compare_leases_without_timestamps(all_leases,
1120                                                s0.debug_get_leases())
1121
1122         write("si1", secrets(0),
1123               {0: ([], [(500, "make me really bigger")], None)}, [])
1124         self.compare_leases_without_timestamps(all_leases,
1125                                                s0.debug_get_leases())
1126
1127         # now cancel them all
1128         ss.remote_cancel_lease("si1", secrets(0)[2])
1129         ss.remote_cancel_lease("si1", secrets(1)[2])
1130         ss.remote_cancel_lease("si1", secrets(2)[2])
1131         ss.remote_cancel_lease("si1", secrets(3)[2])
1132
1133         # the slot should still be there
1134         remaining_shares = read("si1", [], [(0,10)])
1135         self.failUnlessEqual(len(remaining_shares), 1)
1136         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1137
1138         # cancelling a non-existent lease should raise an IndexError
1139         self.failUnlessRaises(IndexError,
1140                               ss.remote_cancel_lease, "si1", "nonsecret")
1141
1142         # and the slot should still be there
1143         remaining_shares = read("si1", [], [(0,10)])
1144         self.failUnlessEqual(len(remaining_shares), 1)
1145         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1146
1147         ss.remote_cancel_lease("si1", secrets(4)[2])
1148         # now the slot should be gone
1149         no_shares = read("si1", [], [(0,10)])
1150         self.failUnlessEqual(no_shares, {})
1151
1152         # cancelling a lease on a non-existent share should raise an IndexError
1153         self.failUnlessRaises(IndexError,
1154                               ss.remote_cancel_lease, "si2", "nonsecret")
1155
1156
1157 class Stats(unittest.TestCase):
1158
1159     def setUp(self):
1160         self.sparent = LoggingServiceParent()
1161         self._lease_secret = itertools.count()
1162     def tearDown(self):
1163         return self.sparent.stopService()
1164
1165     def workdir(self, name):
1166         basedir = os.path.join("storage", "Server", name)
1167         return basedir
1168
1169     def create(self, name):
1170         workdir = self.workdir(name)
1171         ss = StorageServer(workdir)
1172         ss.setNodeID("\x00" * 20)
1173         ss.setServiceParent(self.sparent)
1174         return ss
1175
1176     def test_latencies(self):
1177         ss = self.create("test_latencies")
1178         for i in range(10000):
1179             ss.add_latency("allocate", 1.0 * i)
1180         for i in range(1000):
1181             ss.add_latency("renew", 1.0 * i)
1182         for i in range(10):
1183             ss.add_latency("cancel", 2.0 * i)
1184         ss.add_latency("get", 5.0)
1185
1186         output = ss.get_latencies()
1187
1188         self.failUnlessEqual(sorted(output.keys()),
1189                              sorted(["allocate", "renew", "cancel", "get"]))
1190         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1191         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1192         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1193         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1194         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1195         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1196         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1197         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1198         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1199
1200         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1201         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1202         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1203         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1204         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1205         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1206         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1207         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1208         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1209
1210         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1211         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1212         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1213         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1214         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1215         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1216         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1217         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1218         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1219
1220         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1221         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1222         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1223         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1224         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1225         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1226         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1227         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1228         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)