]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
test_storage.py: more windows-vs-readonly-storage fixes
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil, base32
9 from allmydata.storage import BucketWriter, BucketReader, \
10      StorageServer, MutableShareFile, \
11      storage_index_to_dir, DataTooLargeError, LeaseInfo
12 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
13      ReadBucketProxy
14 from allmydata.interfaces import BadWriteEnablerError
15 from allmydata.test.common import LoggingServiceParent
16
17 class Marker:
18     pass
19 class FakeCanary:
20     def __init__(self, ignore_disconnectors=False):
21         self.ignore = ignore_disconnectors
22         self.disconnectors = {}
23     def notifyOnDisconnect(self, f, *args, **kwargs):
24         if self.ignore:
25             return
26         m = Marker()
27         self.disconnectors[m] = (f, args, kwargs)
28         return m
29     def dontNotifyOnDisconnect(self, marker):
30         if self.ignore:
31             return
32         del self.disconnectors[marker]
33
34 class FakeStatsProvider:
35     def count(self, name, delta=1):
36         pass
37     def register_producer(self, producer):
38         pass
39
40 class Bucket(unittest.TestCase):
41     def make_workdir(self, name):
42         basedir = os.path.join("storage", "Bucket", name)
43         incoming = os.path.join(basedir, "tmp", "bucket")
44         final = os.path.join(basedir, "bucket")
45         fileutil.make_dirs(basedir)
46         fileutil.make_dirs(os.path.join(basedir, "tmp"))
47         return incoming, final
48
49     def bucket_writer_closed(self, bw, consumed):
50         pass
51     def add_latency(self, category, latency):
52         pass
53     def count(self, name, delta=1):
54         pass
55
56     def make_lease(self):
57         owner_num = 0
58         renew_secret = os.urandom(32)
59         cancel_secret = os.urandom(32)
60         expiration_time = time.time() + 5000
61         return LeaseInfo(owner_num, renew_secret, cancel_secret,
62                          expiration_time, "\x00" * 20)
63
64     def test_create(self):
65         incoming, final = self.make_workdir("test_create")
66         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
67                           FakeCanary())
68         bw.remote_write(0, "a"*25)
69         bw.remote_write(25, "b"*25)
70         bw.remote_write(50, "c"*25)
71         bw.remote_write(75, "d"*7)
72         bw.remote_close()
73
74     def test_readwrite(self):
75         incoming, final = self.make_workdir("test_readwrite")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*7) # last block may be short
81         bw.remote_close()
82
83         # now read from it
84         br = BucketReader(self, bw.finalhome)
85         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
86         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
87         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
88
89 class RemoteBucket:
90
91     def callRemote(self, methname, *args, **kwargs):
92         def _call():
93             meth = getattr(self.target, "remote_" + methname)
94             return meth(*args, **kwargs)
95         return defer.maybeDeferred(_call)
96
97 class BucketProxy(unittest.TestCase):
98     def make_bucket(self, name, size):
99         basedir = os.path.join("storage", "BucketProxy", name)
100         incoming = os.path.join(basedir, "tmp", "bucket")
101         final = os.path.join(basedir, "bucket")
102         fileutil.make_dirs(basedir)
103         fileutil.make_dirs(os.path.join(basedir, "tmp"))
104         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
105                           FakeCanary())
106         rb = RemoteBucket()
107         rb.target = bw
108         return bw, rb, final
109
110     def make_lease(self):
111         owner_num = 0
112         renew_secret = os.urandom(32)
113         cancel_secret = os.urandom(32)
114         expiration_time = time.time() + 5000
115         return LeaseInfo(owner_num, renew_secret, cancel_secret,
116                          expiration_time, "\x00" * 20)
117
118     def bucket_writer_closed(self, bw, consumed):
119         pass
120     def add_latency(self, category, latency):
121         pass
122     def count(self, name, delta=1):
123         pass
124
125     def test_create(self):
126         bw, rb, sharefname = self.make_bucket("test_create", 500)
127         bp = WriteBucketProxy(rb,
128                               data_size=300,
129                               segment_size=10,
130                               num_segments=5,
131                               num_share_hashes=3,
132                               uri_extension_size=500, nodeid=None)
133         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
134
135     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
136         # Let's pretend each share has 100 bytes of data, and that there are
137         # 4 segments (25 bytes each), and 8 shares total. So the three
138         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
139         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
140         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
141         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
142         # long. That should make the whole share:
143         #
144         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
145         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
146
147         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
148
149         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
150                             for i in range(7)]
151         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
152                             for i in range(7)]
153         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
154                         for i in range(7)]
155         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
156                         for i in (1,9,13)]
157         uri_extension = "s" + "E"*498 + "e"
158
159         bw, rb, sharefname = self.make_bucket(name, sharesize)
160         bp = wbp_class(rb,
161                        data_size=95,
162                        segment_size=25,
163                        num_segments=4,
164                        num_share_hashes=3,
165                        uri_extension_size=len(uri_extension),
166                        nodeid=None)
167
168         d = bp.start()
169         d.addCallback(lambda res: bp.put_block(0, "a"*25))
170         d.addCallback(lambda res: bp.put_block(1, "b"*25))
171         d.addCallback(lambda res: bp.put_block(2, "c"*25))
172         d.addCallback(lambda res: bp.put_block(3, "d"*20))
173         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
174         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
175         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
176         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
177         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
178         d.addCallback(lambda res: bp.close())
179
180         # now read everything back
181         def _start_reading(res):
182             br = BucketReader(self, sharefname)
183             rb = RemoteBucket()
184             rb.target = br
185             rbp = rbp_class(rb, peerid="abc")
186             self.failUnless("to peer" in repr(rbp))
187             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
188
189             d1 = rbp.startIfNecessary()
190             d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
191             d1.addCallback(lambda res: rbp.get_block(0))
192             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
193             d1.addCallback(lambda res: rbp.get_block(1))
194             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
195             d1.addCallback(lambda res: rbp.get_block(2))
196             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
197             d1.addCallback(lambda res: rbp.get_block(3))
198             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
199
200             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
201             d1.addCallback(lambda res:
202                            self.failUnlessEqual(res, plaintext_hashes))
203             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
204             d1.addCallback(lambda res:
205                            self.failUnlessEqual(res, crypttext_hashes))
206             d1.addCallback(lambda res: rbp.get_block_hashes())
207             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
208             d1.addCallback(lambda res: rbp.get_share_hashes())
209             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
210             d1.addCallback(lambda res: rbp.get_uri_extension())
211             d1.addCallback(lambda res:
212                            self.failUnlessEqual(res, uri_extension))
213
214             return d1
215
216         d.addCallback(_start_reading)
217
218         return d
219
220     def test_readwrite_v1(self):
221         return self._do_test_readwrite("test_readwrite_v1",
222                                        0x24, WriteBucketProxy, ReadBucketProxy)
223
224     def test_readwrite_v2(self):
225         return self._do_test_readwrite("test_readwrite_v2",
226                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
227
228 class FakeDiskStorageServer(StorageServer):
229     def stat_disk(self, d):
230         return self.DISKAVAIL
231
232 class Server(unittest.TestCase):
233
234     def setUp(self):
235         self.sparent = LoggingServiceParent()
236         self._lease_secret = itertools.count()
237     def tearDown(self):
238         return self.sparent.stopService()
239
240     def workdir(self, name):
241         basedir = os.path.join("storage", "Server", name)
242         return basedir
243
244     def create(self, name, reserved_space=0, klass=StorageServer):
245         workdir = self.workdir(name)
246         ss = klass(workdir, reserved_space=reserved_space,
247                    stats_provider=FakeStatsProvider())
248         ss.setNodeID("\x00" * 20)
249         ss.setServiceParent(self.sparent)
250         return ss
251
252     def test_create(self):
253         ss = self.create("test_create")
254
255     def allocate(self, ss, storage_index, sharenums, size, canary=None):
256         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
257         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
258         if not canary:
259             canary = FakeCanary()
260         return ss.remote_allocate_buckets(storage_index,
261                                           renew_secret, cancel_secret,
262                                           sharenums, size, canary)
263
264     def test_dont_overfill_dirs(self):
265         """
266         This test asserts that if you add a second share whose storage index
267         share lots of leading bits with an extant share (but isn't the exact
268         same storage index), this won't add an entry to the share directory.
269         """
270         ss = self.create("test_dont_overfill_dirs")
271         already, writers = self.allocate(ss, "storageindex", [0], 10)
272         for i, wb in writers.items():
273             wb.remote_write(0, "%10d" % i)
274             wb.remote_close()
275         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
276                                 "shares")
277         children_of_storedir = set(os.listdir(storedir))
278
279         # Now store another one under another storageindex that has leading
280         # chars the same as the first storageindex.
281         already, writers = self.allocate(ss, "storageindey", [0], 10)
282         for i, wb in writers.items():
283             wb.remote_write(0, "%10d" % i)
284             wb.remote_close()
285         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
286                                 "shares")
287         new_children_of_storedir = set(os.listdir(storedir))
288         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
289
290     def test_remove_incoming(self):
291         ss = self.create("test_remove_incoming")
292         already, writers = self.allocate(ss, "vid", range(3), 10)
293         for i,wb in writers.items():
294             wb.remote_write(0, "%10d" % i)
295             wb.remote_close()
296         incoming_share_dir = wb.incominghome
297         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
298         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
299         incoming_dir = os.path.dirname(incoming_prefix_dir)
300         self.failIf(os.path.exists(incoming_bucket_dir))
301         self.failIf(os.path.exists(incoming_prefix_dir))
302         self.failUnless(os.path.exists(incoming_dir))
303
304     def test_allocate(self):
305         ss = self.create("test_allocate")
306
307         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
308
309         canary = FakeCanary()
310         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
311         self.failUnlessEqual(already, set())
312         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
313
314         # while the buckets are open, they should not count as readable
315         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
316
317         # close the buckets
318         for i,wb in writers.items():
319             wb.remote_write(0, "%25d" % i)
320             wb.remote_close()
321             # aborting a bucket that was already closed is a no-op
322             wb.remote_abort()
323
324         # now they should be readable
325         b = ss.remote_get_buckets("allocate")
326         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
327         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
328
329         # now if we ask about writing again, the server should offer those
330         # three buckets as already present. It should offer them even if we
331         # don't ask about those specific ones.
332         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
333         self.failUnlessEqual(already, set([0,1,2]))
334         self.failUnlessEqual(set(writers.keys()), set([3,4]))
335
336         # while those two buckets are open for writing, the server should
337         # refuse to offer them to uploaders
338
339         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
340         self.failUnlessEqual(already2, set([0,1,2]))
341         self.failUnlessEqual(set(writers2.keys()), set([5]))
342
343         # aborting the writes should remove the tempfiles
344         for i,wb in writers2.items():
345             wb.remote_abort()
346         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
347         self.failUnlessEqual(already2, set([0,1,2]))
348         self.failUnlessEqual(set(writers2.keys()), set([5]))
349
350         for i,wb in writers2.items():
351             wb.remote_abort()
352         for i,wb in writers.items():
353             wb.remote_abort()
354
355     def test_disconnect(self):
356         # simulate a disconnection
357         ss = self.create("test_disconnect")
358         canary = FakeCanary()
359         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
360         self.failUnlessEqual(already, set())
361         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
362         for (f,args,kwargs) in canary.disconnectors.values():
363             f(*args, **kwargs)
364         del already
365         del writers
366
367         # that ought to delete the incoming shares
368         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
369         self.failUnlessEqual(already, set())
370         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
371
372     def test_reserved_space(self):
373         ss = self.create("test_reserved_space", reserved_space=10000,
374                          klass=FakeDiskStorageServer)
375         # the FakeDiskStorageServer doesn't do real statvfs() calls
376         ss.DISKAVAIL = 15000
377         # 15k available, 10k reserved, leaves 5k for shares
378
379         # a newly created and filled share incurs this much overhead, beyond
380         # the size we request.
381         OVERHEAD = 3*4
382         LEASE_SIZE = 4+32+32+4
383         canary = FakeCanary(True)
384         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
385         self.failUnlessEqual(len(writers), 3)
386         # now the StorageServer should have 3000 bytes provisionally
387         # allocated, allowing only 2000 more to be claimed
388         self.failUnlessEqual(len(ss._active_writers), 3)
389
390         # allocating 1001-byte shares only leaves room for one
391         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
392         self.failUnlessEqual(len(writers2), 1)
393         self.failUnlessEqual(len(ss._active_writers), 4)
394
395         # we abandon the first set, so their provisional allocation should be
396         # returned
397         del already
398         del writers
399         self.failUnlessEqual(len(ss._active_writers), 1)
400         # now we have a provisional allocation of 1001 bytes
401
402         # and we close the second set, so their provisional allocation should
403         # become real, long-term allocation, and grows to include the
404         # overhead.
405         for bw in writers2.values():
406             bw.remote_write(0, "a"*25)
407             bw.remote_close()
408         del already2
409         del writers2
410         del bw
411         self.failUnlessEqual(len(ss._active_writers), 0)
412
413         allocated = 1001 + OVERHEAD + LEASE_SIZE
414
415         # we have to manually increase DISKAVAIL, since we're not doing real
416         # disk measurements
417         ss.DISKAVAIL -= allocated
418
419         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
420         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
421         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
422         self.failUnlessEqual(len(writers3), 39)
423         self.failUnlessEqual(len(ss._active_writers), 39)
424
425         del already3
426         del writers3
427         self.failUnlessEqual(len(ss._active_writers), 0)
428         ss.disownServiceParent()
429         del ss
430
431     def test_seek(self):
432         basedir = self.workdir("test_seek_behavior")
433         fileutil.make_dirs(basedir)
434         filename = os.path.join(basedir, "testfile")
435         f = open(filename, "wb")
436         f.write("start")
437         f.close()
438         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
439         # files. mode="a" preserves previous contents but does not allow
440         # seeking-to-create-holes. mode="r+" allows both.
441         f = open(filename, "rb+")
442         f.seek(100)
443         f.write("100")
444         f.close()
445         filelen = os.stat(filename)[stat.ST_SIZE]
446         self.failUnlessEqual(filelen, 100+3)
447         f2 = open(filename, "rb")
448         self.failUnlessEqual(f2.read(5), "start")
449
450
451     def test_leases(self):
452         ss = self.create("test_leases")
453         canary = FakeCanary()
454         sharenums = range(5)
455         size = 100
456
457         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
458                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
459         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
460                                                      sharenums, size, canary)
461         self.failUnlessEqual(len(already), 0)
462         self.failUnlessEqual(len(writers), 5)
463         for wb in writers.values():
464             wb.remote_close()
465
466         leases = list(ss.get_leases("si0"))
467         self.failUnlessEqual(len(leases), 1)
468         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
469
470         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
471                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
472         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
473                                                      sharenums, size, canary)
474         for wb in writers.values():
475             wb.remote_close()
476
477         # take out a second lease on si1
478         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
479                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
480         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
481                                                      sharenums, size, canary)
482         self.failUnlessEqual(len(already), 5)
483         self.failUnlessEqual(len(writers), 0)
484
485         leases = list(ss.get_leases("si1"))
486         self.failUnlessEqual(len(leases), 2)
487         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
488
489         # check that si0 is readable
490         readers = ss.remote_get_buckets("si0")
491         self.failUnlessEqual(len(readers), 5)
492
493         # renew the first lease. Only the proper renew_secret should work
494         ss.remote_renew_lease("si0", rs0)
495         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
496         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
497
498         # check that si0 is still readable
499         readers = ss.remote_get_buckets("si0")
500         self.failUnlessEqual(len(readers), 5)
501
502         # now cancel it
503         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
504         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
505         ss.remote_cancel_lease("si0", cs0)
506
507         # si0 should now be gone
508         readers = ss.remote_get_buckets("si0")
509         self.failUnlessEqual(len(readers), 0)
510         # and the renew should no longer work
511         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
512
513
514         # cancel the first lease on si1, leaving the second in place
515         ss.remote_cancel_lease("si1", cs1)
516         readers = ss.remote_get_buckets("si1")
517         self.failUnlessEqual(len(readers), 5)
518         # the corresponding renew should no longer work
519         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
520
521         leases = list(ss.get_leases("si1"))
522         self.failUnlessEqual(len(leases), 1)
523         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
524
525         ss.remote_renew_lease("si1", rs2)
526         # cancelling the second should make it go away
527         ss.remote_cancel_lease("si1", cs2)
528         readers = ss.remote_get_buckets("si1")
529         self.failUnlessEqual(len(readers), 0)
530         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
531         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
532
533         leases = list(ss.get_leases("si1"))
534         self.failUnlessEqual(len(leases), 0)
535
536
537         # test overlapping uploads
538         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
539                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
540         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
541                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
542         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
543                                                      sharenums, size, canary)
544         self.failUnlessEqual(len(already), 0)
545         self.failUnlessEqual(len(writers), 5)
546         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
547                                                        sharenums, size, canary)
548         self.failUnlessEqual(len(already2), 0)
549         self.failUnlessEqual(len(writers2), 0)
550         for wb in writers.values():
551             wb.remote_close()
552
553         leases = list(ss.get_leases("si3"))
554         self.failUnlessEqual(len(leases), 1)
555
556         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
557                                                        sharenums, size, canary)
558         self.failUnlessEqual(len(already3), 5)
559         self.failUnlessEqual(len(writers3), 0)
560
561         leases = list(ss.get_leases("si3"))
562         self.failUnlessEqual(len(leases), 2)
563
564     def test_readonly(self):
565         workdir = self.workdir("test_readonly")
566         ss = StorageServer(workdir, readonly_storage=True)
567         ss.setNodeID("\x00" * 20)
568         ss.setServiceParent(self.sparent)
569
570         canary = FakeCanary()
571         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
572         self.failUnlessEqual(already, set())
573         self.failUnlessEqual(writers, {})
574
575         stats = ss.get_stats()
576         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
577                              False)
578         if "storage_server.disk_avail" in stats:
579             # windows does not have os.statvfs, so it doesn't give us disk
580             # stats. But if there are stats, readonly_storage means
581             # disk_avail=0
582             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
583
584     def test_discard(self):
585         # discard is really only used for other tests, but we test it anyways
586         workdir = self.workdir("test_discard")
587         ss = StorageServer(workdir, discard_storage=True)
588         ss.setNodeID("\x00" * 20)
589         ss.setServiceParent(self.sparent)
590
591         canary = FakeCanary()
592         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
593         self.failUnlessEqual(already, set())
594         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
595         for i,wb in writers.items():
596             wb.remote_write(0, "%25d" % i)
597             wb.remote_close()
598         # since we discard the data, the shares should be present but sparse.
599         # Since we write with some seeks, the data we read back will be all
600         # zeros.
601         b = ss.remote_get_buckets("vid")
602         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
603         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
604
605     def test_advise_corruption(self):
606         workdir = self.workdir("test_advise_corruption")
607         ss = StorageServer(workdir, discard_storage=True)
608         ss.setNodeID("\x00" * 20)
609         ss.setServiceParent(self.sparent)
610
611         si0_s = base32.b2a("si0")
612         ss.remote_advise_corrupt_share("immutable", "si0", 0,
613                                        "This share smells funny.\n")
614         reportdir = os.path.join(workdir, "corruption-advisories")
615         reports = os.listdir(reportdir)
616         self.failUnlessEqual(len(reports), 1)
617         report_si0 = reports[0]
618         self.failUnless(si0_s in report_si0, report_si0)
619         f = open(os.path.join(reportdir, report_si0), "r")
620         report = f.read()
621         f.close()
622         self.failUnless("type: immutable" in report)
623         self.failUnless(("storage_index: %s" % si0_s) in report)
624         self.failUnless("share_number: 0" in report)
625         self.failUnless("This share smells funny." in report)
626
627         # test the RIBucketWriter version too
628         si1_s = base32.b2a("si1")
629         already,writers = self.allocate(ss, "si1", [1], 75)
630         self.failUnlessEqual(already, set())
631         self.failUnlessEqual(set(writers.keys()), set([1]))
632         writers[1].remote_write(0, "data")
633         writers[1].remote_close()
634
635         b = ss.remote_get_buckets("si1")
636         self.failUnlessEqual(set(b.keys()), set([1]))
637         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
638
639         reports = os.listdir(reportdir)
640         self.failUnlessEqual(len(reports), 2)
641         report_si1 = [r for r in reports if si1_s in r][0]
642         f = open(os.path.join(reportdir, report_si1), "r")
643         report = f.read()
644         f.close()
645         self.failUnless("type: immutable" in report)
646         self.failUnless(("storage_index: %s" % si1_s) in report)
647         self.failUnless("share_number: 1" in report)
648         self.failUnless("This share tastes like dust." in report)
649
650
651
652 class MutableServer(unittest.TestCase):
653
654     def setUp(self):
655         self.sparent = LoggingServiceParent()
656         self._lease_secret = itertools.count()
657     def tearDown(self):
658         return self.sparent.stopService()
659
660     def workdir(self, name):
661         basedir = os.path.join("storage", "MutableServer", name)
662         return basedir
663
664     def create(self, name):
665         workdir = self.workdir(name)
666         ss = StorageServer(workdir)
667         ss.setServiceParent(self.sparent)
668         ss.setNodeID("\x00" * 20)
669         return ss
670
671     def test_create(self):
672         ss = self.create("test_create")
673
674     def write_enabler(self, we_tag):
675         return hashutil.tagged_hash("we_blah", we_tag)
676
677     def renew_secret(self, tag):
678         return hashutil.tagged_hash("renew_blah", str(tag))
679
680     def cancel_secret(self, tag):
681         return hashutil.tagged_hash("cancel_blah", str(tag))
682
683     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
684         write_enabler = self.write_enabler(we_tag)
685         renew_secret = self.renew_secret(lease_tag)
686         cancel_secret = self.cancel_secret(lease_tag)
687         rstaraw = ss.remote_slot_testv_and_readv_and_writev
688         testandwritev = dict( [ (shnum, ([], [], None) )
689                          for shnum in sharenums ] )
690         readv = []
691         rc = rstaraw(storage_index,
692                      (write_enabler, renew_secret, cancel_secret),
693                      testandwritev,
694                      readv)
695         (did_write, readv_data) = rc
696         self.failUnless(did_write)
697         self.failUnless(isinstance(readv_data, dict))
698         self.failUnlessEqual(len(readv_data), 0)
699
700     def test_container_size(self):
701         ss = self.create("test_container_size")
702         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
703                       set([0,1,2]), 100)
704         rstaraw = ss.remote_slot_testv_and_readv_and_writev
705         secrets = ( self.write_enabler("we1"),
706                     self.renew_secret("we1"),
707                     self.cancel_secret("we1") )
708         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
709         answer = rstaraw("si1", secrets,
710                          {0: ([], [(0,data)], len(data)+12)},
711                          [])
712         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
713
714         # trying to make the container too large will raise an exception
715         TOOBIG = MutableShareFile.MAX_SIZE + 10
716         self.failUnlessRaises(DataTooLargeError,
717                               rstaraw, "si1", secrets,
718                               {0: ([], [(0,data)], TOOBIG)},
719                               [])
720
721         # it should be possible to make the container smaller, although at
722         # the moment this doesn't actually affect the share
723         answer = rstaraw("si1", secrets,
724                          {0: ([], [(0,data)], len(data)+8)},
725                          [])
726         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
727
728     def test_allocate(self):
729         ss = self.create("test_allocate")
730         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
731                       set([0,1,2]), 100)
732
733         read = ss.remote_slot_readv
734         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
735                              {0: [""]})
736         self.failUnlessEqual(read("si1", [], [(0, 10)]),
737                              {0: [""], 1: [""], 2: [""]})
738         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
739                              {0: [""]})
740
741         # try writing to one
742         secrets = ( self.write_enabler("we1"),
743                     self.renew_secret("we1"),
744                     self.cancel_secret("we1") )
745         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
746         write = ss.remote_slot_testv_and_readv_and_writev
747         answer = write("si1", secrets,
748                        {0: ([], [(0,data)], None)},
749                        [])
750         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
751
752         self.failUnlessEqual(read("si1", [0], [(0,20)]),
753                              {0: ["00000000001111111111"]})
754         self.failUnlessEqual(read("si1", [0], [(95,10)]),
755                              {0: ["99999"]})
756         #self.failUnlessEqual(s0.remote_get_length(), 100)
757
758         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
759         f = self.failUnlessRaises(BadWriteEnablerError,
760                                   write, "si1", bad_secrets,
761                                   {}, [])
762         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
763
764         # this testv should fail
765         answer = write("si1", secrets,
766                        {0: ([(0, 12, "eq", "444444444444"),
767                              (20, 5, "eq", "22222"),
768                              ],
769                             [(0, "x"*100)],
770                             None),
771                         },
772                        [(0,12), (20,5)],
773                        )
774         self.failUnlessEqual(answer, (False,
775                                       {0: ["000000000011", "22222"],
776                                        1: ["", ""],
777                                        2: ["", ""],
778                                        }))
779         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
780
781         # as should this one
782         answer = write("si1", secrets,
783                        {0: ([(10, 5, "lt", "11111"),
784                              ],
785                             [(0, "x"*100)],
786                             None),
787                         },
788                        [(10,5)],
789                        )
790         self.failUnlessEqual(answer, (False,
791                                       {0: ["11111"],
792                                        1: [""],
793                                        2: [""]},
794                                       ))
795         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
796
797
798     def test_operators(self):
799         # test operators, the data we're comparing is '11111' in all cases.
800         # test both fail+pass, reset data after each one.
801         ss = self.create("test_operators")
802
803         secrets = ( self.write_enabler("we1"),
804                     self.renew_secret("we1"),
805                     self.cancel_secret("we1") )
806         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
807         write = ss.remote_slot_testv_and_readv_and_writev
808         read = ss.remote_slot_readv
809
810         def reset():
811             write("si1", secrets,
812                   {0: ([], [(0,data)], None)},
813                   [])
814
815         reset()
816
817         #  lt
818         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
819                                              ],
820                                             [(0, "x"*100)],
821                                             None,
822                                             )}, [(10,5)])
823         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
825         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
826         reset()
827
828         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
829                                              ],
830                                             [(0, "x"*100)],
831                                             None,
832                                             )}, [(10,5)])
833         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
834         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
835         reset()
836
837         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
838                                              ],
839                                             [(0, "y"*100)],
840                                             None,
841                                             )}, [(10,5)])
842         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
843         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
844         reset()
845
846         #  le
847         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
848                                              ],
849                                             [(0, "x"*100)],
850                                             None,
851                                             )}, [(10,5)])
852         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
853         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
854         reset()
855
856         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
857                                              ],
858                                             [(0, "y"*100)],
859                                             None,
860                                             )}, [(10,5)])
861         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
862         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
863         reset()
864
865         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
866                                              ],
867                                             [(0, "y"*100)],
868                                             None,
869                                             )}, [(10,5)])
870         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
871         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
872         reset()
873
874         #  eq
875         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
876                                              ],
877                                             [(0, "x"*100)],
878                                             None,
879                                             )}, [(10,5)])
880         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
881         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
882         reset()
883
884         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
885                                              ],
886                                             [(0, "y"*100)],
887                                             None,
888                                             )}, [(10,5)])
889         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
890         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
891         reset()
892
893         #  ne
894         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
895                                              ],
896                                             [(0, "x"*100)],
897                                             None,
898                                             )}, [(10,5)])
899         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
900         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
901         reset()
902
903         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
904                                              ],
905                                             [(0, "y"*100)],
906                                             None,
907                                             )}, [(10,5)])
908         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
909         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
910         reset()
911
912         #  ge
913         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
914                                              ],
915                                             [(0, "y"*100)],
916                                             None,
917                                             )}, [(10,5)])
918         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
919         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
920         reset()
921
922         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
923                                              ],
924                                             [(0, "y"*100)],
925                                             None,
926                                             )}, [(10,5)])
927         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
928         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
929         reset()
930
931         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
932                                              ],
933                                             [(0, "y"*100)],
934                                             None,
935                                             )}, [(10,5)])
936         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
937         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
938         reset()
939
940         #  gt
941         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
942                                              ],
943                                             [(0, "y"*100)],
944                                             None,
945                                             )}, [(10,5)])
946         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
947         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
948         reset()
949
950         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
951                                              ],
952                                             [(0, "x"*100)],
953                                             None,
954                                             )}, [(10,5)])
955         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
956         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
957         reset()
958
959         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
960                                              ],
961                                             [(0, "x"*100)],
962                                             None,
963                                             )}, [(10,5)])
964         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
965         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
966         reset()
967
968         # finally, test some operators against empty shares
969         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
970                                              ],
971                                             [(0, "x"*100)],
972                                             None,
973                                             )}, [(10,5)])
974         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
976         reset()
977
978     def test_readv(self):
979         ss = self.create("test_readv")
980         secrets = ( self.write_enabler("we1"),
981                     self.renew_secret("we1"),
982                     self.cancel_secret("we1") )
983         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
984         write = ss.remote_slot_testv_and_readv_and_writev
985         read = ss.remote_slot_readv
986         data = [("%d" % i) * 100 for i in range(3)]
987         rc = write("si1", secrets,
988                    {0: ([], [(0,data[0])], None),
989                     1: ([], [(0,data[1])], None),
990                     2: ([], [(0,data[2])], None),
991                     }, [])
992         self.failUnlessEqual(rc, (True, {}))
993
994         answer = read("si1", [], [(0, 10)])
995         self.failUnlessEqual(answer, {0: ["0"*10],
996                                       1: ["1"*10],
997                                       2: ["2"*10]})
998
999     def compare_leases_without_timestamps(self, leases_a, leases_b):
1000         self.failUnlessEqual(len(leases_a), len(leases_b))
1001         for i in range(len(leases_a)):
1002             num_a, a = leases_a[i]
1003             num_b, b = leases_b[i]
1004             self.failUnlessEqual(num_a, num_b)
1005             self.failUnlessEqual(a.owner_num,       b.owner_num)
1006             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1007             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1008             self.failUnlessEqual(a.nodeid,          b.nodeid)
1009
1010     def compare_leases(self, leases_a, leases_b):
1011         self.failUnlessEqual(len(leases_a), len(leases_b))
1012         for i in range(len(leases_a)):
1013             num_a, a = leases_a[i]
1014             num_b, b = leases_b[i]
1015             self.failUnlessEqual(num_a, num_b)
1016             self.failUnlessEqual(a.owner_num,       b.owner_num)
1017             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1018             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1019             self.failUnlessEqual(a.nodeid,          b.nodeid)
1020             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1021
1022     def test_leases(self):
1023         ss = self.create("test_leases")
1024         def secrets(n):
1025             return ( self.write_enabler("we1"),
1026                      self.renew_secret("we1-%d" % n),
1027                      self.cancel_secret("we1-%d" % n) )
1028         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1029         write = ss.remote_slot_testv_and_readv_and_writev
1030         read = ss.remote_slot_readv
1031         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1032         self.failUnlessEqual(rc, (True, {}))
1033
1034         # create a random non-numeric file in the bucket directory, to
1035         # exercise the code that's supposed to ignore those.
1036         bucket_dir = os.path.join(self.workdir("test_leases"),
1037                                   "shares", storage_index_to_dir("si1"))
1038         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1039         f.write("you ought to be ignoring me\n")
1040         f.close()
1041
1042         # re-allocate the slots and use the same secrets, that should update
1043         # the lease
1044         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1045
1046         # renew it directly
1047         ss.remote_renew_lease("si1", secrets(0)[1])
1048
1049         # now allocate them with a bunch of different secrets, to trigger the
1050         # extended lease code
1051         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1052         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
1053         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1054         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1055         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1056
1057         # cancel one of them
1058         ss.remote_cancel_lease("si1", secrets(5)[2])
1059
1060         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1061         all_leases = s0.debug_get_leases()
1062         self.failUnlessEqual(len(all_leases), 5)
1063
1064         # and write enough data to expand the container, forcing the server
1065         # to move the leases
1066         write("si1", secrets(0),
1067               {0: ([], [(0,data)], 200), },
1068               [])
1069
1070         # read back the leases, make sure they're still intact.
1071         self.compare_leases_without_timestamps(all_leases,
1072                                                s0.debug_get_leases())
1073
1074         ss.remote_renew_lease("si1", secrets(0)[1])
1075         ss.remote_renew_lease("si1", secrets(1)[1])
1076         ss.remote_renew_lease("si1", secrets(2)[1])
1077         ss.remote_renew_lease("si1", secrets(3)[1])
1078         ss.remote_renew_lease("si1", secrets(4)[1])
1079         self.compare_leases_without_timestamps(all_leases,
1080                                                s0.debug_get_leases())
1081         # get a new copy of the leases, with the current timestamps. Reading
1082         # data and failing to renew/cancel leases should leave the timestamps
1083         # alone.
1084         all_leases = s0.debug_get_leases()
1085         # renewing with a bogus token should prompt an error message
1086
1087         # examine the exception thus raised, make sure the old nodeid is
1088         # present, to provide for share migration
1089         e = self.failUnlessRaises(IndexError,
1090                                   ss.remote_renew_lease, "si1",
1091                                   secrets(20)[1])
1092         e_s = str(e)
1093         self.failUnless("Unable to renew non-existent lease" in e_s)
1094         self.failUnless("I have leases accepted by nodeids:" in e_s)
1095         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1096
1097         # same for cancelling
1098         self.failUnlessRaises(IndexError,
1099                               ss.remote_cancel_lease, "si1",
1100                               secrets(20)[2])
1101         self.compare_leases(all_leases, s0.debug_get_leases())
1102
1103         # reading shares should not modify the timestamp
1104         read("si1", [], [(0,200)])
1105         self.compare_leases(all_leases, s0.debug_get_leases())
1106
1107         write("si1", secrets(0),
1108               {0: ([], [(200, "make me bigger")], None)}, [])
1109         self.compare_leases_without_timestamps(all_leases,
1110                                                s0.debug_get_leases())
1111
1112         write("si1", secrets(0),
1113               {0: ([], [(500, "make me really bigger")], None)}, [])
1114         self.compare_leases_without_timestamps(all_leases,
1115                                                s0.debug_get_leases())
1116
1117         # now cancel them all
1118         ss.remote_cancel_lease("si1", secrets(0)[2])
1119         ss.remote_cancel_lease("si1", secrets(1)[2])
1120         ss.remote_cancel_lease("si1", secrets(2)[2])
1121         ss.remote_cancel_lease("si1", secrets(3)[2])
1122
1123         # the slot should still be there
1124         remaining_shares = read("si1", [], [(0,10)])
1125         self.failUnlessEqual(len(remaining_shares), 1)
1126         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1127
1128         # cancelling a non-existent lease should raise an IndexError
1129         self.failUnlessRaises(IndexError,
1130                               ss.remote_cancel_lease, "si1", "nonsecret")
1131
1132         # and the slot should still be there
1133         remaining_shares = read("si1", [], [(0,10)])
1134         self.failUnlessEqual(len(remaining_shares), 1)
1135         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1136
1137         ss.remote_cancel_lease("si1", secrets(4)[2])
1138         # now the slot should be gone
1139         no_shares = read("si1", [], [(0,10)])
1140         self.failUnlessEqual(no_shares, {})
1141
1142         # cancelling a lease on a non-existent share should raise an IndexError
1143         self.failUnlessRaises(IndexError,
1144                               ss.remote_cancel_lease, "si2", "nonsecret")
1145
1146
1147 class Stats(unittest.TestCase):
1148
1149     def setUp(self):
1150         self.sparent = LoggingServiceParent()
1151         self._lease_secret = itertools.count()
1152     def tearDown(self):
1153         return self.sparent.stopService()
1154
1155     def workdir(self, name):
1156         basedir = os.path.join("storage", "Server", name)
1157         return basedir
1158
1159     def create(self, name):
1160         workdir = self.workdir(name)
1161         ss = StorageServer(workdir)
1162         ss.setNodeID("\x00" * 20)
1163         ss.setServiceParent(self.sparent)
1164         return ss
1165
1166     def test_latencies(self):
1167         ss = self.create("test_latencies")
1168         for i in range(10000):
1169             ss.add_latency("allocate", 1.0 * i)
1170         for i in range(1000):
1171             ss.add_latency("renew", 1.0 * i)
1172         for i in range(10):
1173             ss.add_latency("cancel", 2.0 * i)
1174         ss.add_latency("get", 5.0)
1175
1176         output = ss.get_latencies()
1177
1178         self.failUnlessEqual(sorted(output.keys()),
1179                              sorted(["allocate", "renew", "cancel", "get"]))
1180         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1181         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1182         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1183         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1184         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1185         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1186         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1187         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1188         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1189
1190         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1191         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1192         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1193         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1194         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1195         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1196         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1197         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1198         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1199
1200         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1201         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1202         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1203         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1204         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1205         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1206         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1207         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1208         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1209
1210         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1211         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1212         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1213         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1214         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1215         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1216         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1217         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1218         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)