]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
4c0544433680b985d50d49b4e32f5fff9e61b052
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 from twisted.trial import unittest
2
3 from twisted.internet import defer
4 import time, os.path, stat
5 import itertools
6 from allmydata import interfaces
7 from allmydata.util import fileutil, hashutil, base32
8 from allmydata.storage import BucketWriter, BucketReader, \
9      StorageServer, MutableShareFile, \
10      storage_index_to_dir, DataTooLargeError, LeaseInfo
11 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
12      ReadBucketProxy
13 from allmydata.interfaces import BadWriteEnablerError
14 from allmydata.test.common import LoggingServiceParent
15
16 class Marker:
17     pass
18 class FakeCanary:
19     def __init__(self, ignore_disconnectors=False):
20         self.ignore = ignore_disconnectors
21         self.disconnectors = {}
22     def notifyOnDisconnect(self, f, *args, **kwargs):
23         if self.ignore:
24             return
25         m = Marker()
26         self.disconnectors[m] = (f, args, kwargs)
27         return m
28     def dontNotifyOnDisconnect(self, marker):
29         if self.ignore:
30             return
31         del self.disconnectors[marker]
32
33 class FakeStatsProvider:
34     def count(self, name, delta=1):
35         pass
36     def register_producer(self, producer):
37         pass
38
39 class Bucket(unittest.TestCase):
40     def make_workdir(self, name):
41         basedir = os.path.join("storage", "Bucket", name)
42         incoming = os.path.join(basedir, "tmp", "bucket")
43         final = os.path.join(basedir, "bucket")
44         fileutil.make_dirs(basedir)
45         fileutil.make_dirs(os.path.join(basedir, "tmp"))
46         return incoming, final
47
48     def bucket_writer_closed(self, bw, consumed):
49         pass
50     def add_latency(self, category, latency):
51         pass
52     def count(self, name, delta=1):
53         pass
54
55     def make_lease(self):
56         owner_num = 0
57         renew_secret = os.urandom(32)
58         cancel_secret = os.urandom(32)
59         expiration_time = time.time() + 5000
60         return LeaseInfo(owner_num, renew_secret, cancel_secret,
61                          expiration_time, "\x00" * 20)
62
63     def test_create(self):
64         incoming, final = self.make_workdir("test_create")
65         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
66                           FakeCanary())
67         bw.remote_write(0, "a"*25)
68         bw.remote_write(25, "b"*25)
69         bw.remote_write(50, "c"*25)
70         bw.remote_write(75, "d"*7)
71         bw.remote_close()
72
73     def test_readwrite(self):
74         incoming, final = self.make_workdir("test_readwrite")
75         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
76                           FakeCanary())
77         bw.remote_write(0, "a"*25)
78         bw.remote_write(25, "b"*25)
79         bw.remote_write(50, "c"*7) # last block may be short
80         bw.remote_close()
81
82         # now read from it
83         br = BucketReader(self, bw.finalhome)
84         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
85         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
86         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
87
88 class RemoteBucket:
89
90     def callRemote(self, methname, *args, **kwargs):
91         def _call():
92             meth = getattr(self.target, "remote_" + methname)
93             return meth(*args, **kwargs)
94         return defer.maybeDeferred(_call)
95
96 class BucketProxy(unittest.TestCase):
97     def make_bucket(self, name, size):
98         basedir = os.path.join("storage", "BucketProxy", name)
99         incoming = os.path.join(basedir, "tmp", "bucket")
100         final = os.path.join(basedir, "bucket")
101         fileutil.make_dirs(basedir)
102         fileutil.make_dirs(os.path.join(basedir, "tmp"))
103         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
104                           FakeCanary())
105         rb = RemoteBucket()
106         rb.target = bw
107         return bw, rb, final
108
109     def make_lease(self):
110         owner_num = 0
111         renew_secret = os.urandom(32)
112         cancel_secret = os.urandom(32)
113         expiration_time = time.time() + 5000
114         return LeaseInfo(owner_num, renew_secret, cancel_secret,
115                          expiration_time, "\x00" * 20)
116
117     def bucket_writer_closed(self, bw, consumed):
118         pass
119     def add_latency(self, category, latency):
120         pass
121     def count(self, name, delta=1):
122         pass
123
124     def test_create(self):
125         bw, rb, sharefname = self.make_bucket("test_create", 500)
126         bp = WriteBucketProxy(rb,
127                               data_size=300,
128                               block_size=10,
129                               num_segments=5,
130                               num_share_hashes=3,
131                               uri_extension_size_max=500, nodeid=None)
132         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
133
134     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
135         # Let's pretend each share has 100 bytes of data, and that there are
136         # 4 segments (25 bytes each), and 8 shares total. So the two
137         # per-segment merkle trees (crypttext_hash_tree,
138         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
139         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
140         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
141         # long. That should make the whole share:
142         #
143         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
144         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
145
146         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
147
148         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
149                             for i in range(7)]
150         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
151                         for i in range(7)]
152         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
153                         for i in (1,9,13)]
154         uri_extension = "s" + "E"*498 + "e"
155
156         bw, rb, sharefname = self.make_bucket(name, sharesize)
157         bp = wbp_class(rb,
158                        data_size=95,
159                        block_size=25,
160                        num_segments=4,
161                        num_share_hashes=3,
162                        uri_extension_size_max=len(uri_extension),
163                        nodeid=None)
164
165         d = bp.put_header()
166         d.addCallback(lambda res: bp.put_block(0, "a"*25))
167         d.addCallback(lambda res: bp.put_block(1, "b"*25))
168         d.addCallback(lambda res: bp.put_block(2, "c"*25))
169         d.addCallback(lambda res: bp.put_block(3, "d"*20))
170         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
171         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
172         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
173         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
174         d.addCallback(lambda res: bp.close())
175
176         # now read everything back
177         def _start_reading(res):
178             br = BucketReader(self, sharefname)
179             rb = RemoteBucket()
180             rb.target = br
181             rbp = rbp_class(rb, peerid="abc", storage_index="")
182             self.failUnless("to peer" in repr(rbp))
183             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
184
185             d1 = rbp.get_block_data(0, 25, 25)
186             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
187             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
188             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
189             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
190             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
191             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
192             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
193
194             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
195             d1.addCallback(lambda res:
196                            self.failUnlessEqual(res, crypttext_hashes))
197             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
198             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
199             d1.addCallback(lambda res: rbp.get_share_hashes())
200             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
201             d1.addCallback(lambda res: rbp.get_uri_extension())
202             d1.addCallback(lambda res:
203                            self.failUnlessEqual(res, uri_extension))
204
205             return d1
206
207         d.addCallback(_start_reading)
208
209         return d
210
211     def test_readwrite_v1(self):
212         return self._do_test_readwrite("test_readwrite_v1",
213                                        0x24, WriteBucketProxy, ReadBucketProxy)
214
215     def test_readwrite_v2(self):
216         return self._do_test_readwrite("test_readwrite_v2",
217                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
218
219 class FakeDiskStorageServer(StorageServer):
220     def stat_disk(self, d):
221         return self.DISKAVAIL
222
223 class Server(unittest.TestCase):
224
225     def setUp(self):
226         self.sparent = LoggingServiceParent()
227         self._lease_secret = itertools.count()
228     def tearDown(self):
229         return self.sparent.stopService()
230
231     def workdir(self, name):
232         basedir = os.path.join("storage", "Server", name)
233         return basedir
234
235     def create(self, name, reserved_space=0, klass=StorageServer):
236         workdir = self.workdir(name)
237         ss = klass(workdir, reserved_space=reserved_space,
238                    stats_provider=FakeStatsProvider())
239         ss.setNodeID("\x00" * 20)
240         ss.setServiceParent(self.sparent)
241         return ss
242
243     def test_create(self):
244         ss = self.create("test_create")
245
246     def allocate(self, ss, storage_index, sharenums, size, canary=None):
247         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
248         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
249         if not canary:
250             canary = FakeCanary()
251         return ss.remote_allocate_buckets(storage_index,
252                                           renew_secret, cancel_secret,
253                                           sharenums, size, canary)
254
255     def test_large_share(self):
256         ss = self.create("test_large_share")
257
258         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
259         self.failUnlessEqual(already, set())
260         self.failUnlessEqual(set(writers.keys()), set([0]))
261
262         shnum, bucket = writers.items()[0]
263         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
264         bucket.remote_write(2**32, "ab")
265         bucket.remote_close()
266
267         readers = ss.remote_get_buckets("allocate")
268         reader = readers[shnum]
269         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
270     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
271
272     def test_dont_overfill_dirs(self):
273         """
274         This test asserts that if you add a second share whose storage index
275         share lots of leading bits with an extant share (but isn't the exact
276         same storage index), this won't add an entry to the share directory.
277         """
278         ss = self.create("test_dont_overfill_dirs")
279         already, writers = self.allocate(ss, "storageindex", [0], 10)
280         for i, wb in writers.items():
281             wb.remote_write(0, "%10d" % i)
282             wb.remote_close()
283         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
284                                 "shares")
285         children_of_storedir = set(os.listdir(storedir))
286
287         # Now store another one under another storageindex that has leading
288         # chars the same as the first storageindex.
289         already, writers = self.allocate(ss, "storageindey", [0], 10)
290         for i, wb in writers.items():
291             wb.remote_write(0, "%10d" % i)
292             wb.remote_close()
293         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
294                                 "shares")
295         new_children_of_storedir = set(os.listdir(storedir))
296         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
297
298     def test_remove_incoming(self):
299         ss = self.create("test_remove_incoming")
300         already, writers = self.allocate(ss, "vid", range(3), 10)
301         for i,wb in writers.items():
302             wb.remote_write(0, "%10d" % i)
303             wb.remote_close()
304         incoming_share_dir = wb.incominghome
305         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
306         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
307         incoming_dir = os.path.dirname(incoming_prefix_dir)
308         self.failIf(os.path.exists(incoming_bucket_dir))
309         self.failIf(os.path.exists(incoming_prefix_dir))
310         self.failUnless(os.path.exists(incoming_dir))
311
312     def test_allocate(self):
313         ss = self.create("test_allocate")
314
315         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
316
317         canary = FakeCanary()
318         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
319         self.failUnlessEqual(already, set())
320         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
321
322         # while the buckets are open, they should not count as readable
323         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
324
325         # close the buckets
326         for i,wb in writers.items():
327             wb.remote_write(0, "%25d" % i)
328             wb.remote_close()
329             # aborting a bucket that was already closed is a no-op
330             wb.remote_abort()
331
332         # now they should be readable
333         b = ss.remote_get_buckets("allocate")
334         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
335         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
336
337         # now if we ask about writing again, the server should offer those
338         # three buckets as already present. It should offer them even if we
339         # don't ask about those specific ones.
340         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
341         self.failUnlessEqual(already, set([0,1,2]))
342         self.failUnlessEqual(set(writers.keys()), set([3,4]))
343
344         # while those two buckets are open for writing, the server should
345         # refuse to offer them to uploaders
346
347         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
348         self.failUnlessEqual(already2, set([0,1,2]))
349         self.failUnlessEqual(set(writers2.keys()), set([5]))
350
351         # aborting the writes should remove the tempfiles
352         for i,wb in writers2.items():
353             wb.remote_abort()
354         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
355         self.failUnlessEqual(already2, set([0,1,2]))
356         self.failUnlessEqual(set(writers2.keys()), set([5]))
357
358         for i,wb in writers2.items():
359             wb.remote_abort()
360         for i,wb in writers.items():
361             wb.remote_abort()
362
363     def test_disconnect(self):
364         # simulate a disconnection
365         ss = self.create("test_disconnect")
366         canary = FakeCanary()
367         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
368         self.failUnlessEqual(already, set())
369         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
370         for (f,args,kwargs) in canary.disconnectors.values():
371             f(*args, **kwargs)
372         del already
373         del writers
374
375         # that ought to delete the incoming shares
376         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
377         self.failUnlessEqual(already, set())
378         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
379
380     def test_reserved_space(self):
381         ss = self.create("test_reserved_space", reserved_space=10000,
382                          klass=FakeDiskStorageServer)
383         # the FakeDiskStorageServer doesn't do real statvfs() calls
384         ss.DISKAVAIL = 15000
385         # 15k available, 10k reserved, leaves 5k for shares
386
387         # a newly created and filled share incurs this much overhead, beyond
388         # the size we request.
389         OVERHEAD = 3*4
390         LEASE_SIZE = 4+32+32+4
391         canary = FakeCanary(True)
392         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
393         self.failUnlessEqual(len(writers), 3)
394         # now the StorageServer should have 3000 bytes provisionally
395         # allocated, allowing only 2000 more to be claimed
396         self.failUnlessEqual(len(ss._active_writers), 3)
397
398         # allocating 1001-byte shares only leaves room for one
399         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
400         self.failUnlessEqual(len(writers2), 1)
401         self.failUnlessEqual(len(ss._active_writers), 4)
402
403         # we abandon the first set, so their provisional allocation should be
404         # returned
405         del already
406         del writers
407         self.failUnlessEqual(len(ss._active_writers), 1)
408         # now we have a provisional allocation of 1001 bytes
409
410         # and we close the second set, so their provisional allocation should
411         # become real, long-term allocation, and grows to include the
412         # overhead.
413         for bw in writers2.values():
414             bw.remote_write(0, "a"*25)
415             bw.remote_close()
416         del already2
417         del writers2
418         del bw
419         self.failUnlessEqual(len(ss._active_writers), 0)
420
421         allocated = 1001 + OVERHEAD + LEASE_SIZE
422
423         # we have to manually increase DISKAVAIL, since we're not doing real
424         # disk measurements
425         ss.DISKAVAIL -= allocated
426
427         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
428         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
429         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
430         self.failUnlessEqual(len(writers3), 39)
431         self.failUnlessEqual(len(ss._active_writers), 39)
432
433         del already3
434         del writers3
435         self.failUnlessEqual(len(ss._active_writers), 0)
436         ss.disownServiceParent()
437         del ss
438
439     def test_seek(self):
440         basedir = self.workdir("test_seek_behavior")
441         fileutil.make_dirs(basedir)
442         filename = os.path.join(basedir, "testfile")
443         f = open(filename, "wb")
444         f.write("start")
445         f.close()
446         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
447         # files. mode="a" preserves previous contents but does not allow
448         # seeking-to-create-holes. mode="r+" allows both.
449         f = open(filename, "rb+")
450         f.seek(100)
451         f.write("100")
452         f.close()
453         filelen = os.stat(filename)[stat.ST_SIZE]
454         self.failUnlessEqual(filelen, 100+3)
455         f2 = open(filename, "rb")
456         self.failUnlessEqual(f2.read(5), "start")
457
458
459     def test_leases(self):
460         ss = self.create("test_leases")
461         canary = FakeCanary()
462         sharenums = range(5)
463         size = 100
464
465         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
466                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
467         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
468                                                      sharenums, size, canary)
469         self.failUnlessEqual(len(already), 0)
470         self.failUnlessEqual(len(writers), 5)
471         for wb in writers.values():
472             wb.remote_close()
473
474         leases = list(ss.get_leases("si0"))
475         self.failUnlessEqual(len(leases), 1)
476         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
477
478         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
479                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
480         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
481                                                      sharenums, size, canary)
482         for wb in writers.values():
483             wb.remote_close()
484
485         # take out a second lease on si1
486         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
487                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
488         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
489                                                      sharenums, size, canary)
490         self.failUnlessEqual(len(already), 5)
491         self.failUnlessEqual(len(writers), 0)
492
493         leases = list(ss.get_leases("si1"))
494         self.failUnlessEqual(len(leases), 2)
495         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
496
497         # and a third lease, using add-lease
498         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
499                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
500         ss.remote_add_lease("si1", rs2a, cs2a)
501         leases = list(ss.get_leases("si1"))
502         self.failUnlessEqual(len(leases), 3)
503         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
504
505         # add-lease on a missing storage index is silently ignored
506         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
507
508         # check that si0 is readable
509         readers = ss.remote_get_buckets("si0")
510         self.failUnlessEqual(len(readers), 5)
511
512         # renew the first lease. Only the proper renew_secret should work
513         ss.remote_renew_lease("si0", rs0)
514         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
515         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
516
517         # check that si0 is still readable
518         readers = ss.remote_get_buckets("si0")
519         self.failUnlessEqual(len(readers), 5)
520
521         # now cancel it
522         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
523         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
524         ss.remote_cancel_lease("si0", cs0)
525
526         # si0 should now be gone
527         readers = ss.remote_get_buckets("si0")
528         self.failUnlessEqual(len(readers), 0)
529         # and the renew should no longer work
530         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
531
532
533         # cancel the first lease on si1, leaving the second and third in place
534         ss.remote_cancel_lease("si1", cs1)
535         readers = ss.remote_get_buckets("si1")
536         self.failUnlessEqual(len(readers), 5)
537         # the corresponding renew should no longer work
538         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
539
540         leases = list(ss.get_leases("si1"))
541         self.failUnlessEqual(len(leases), 2)
542         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
543
544         ss.remote_renew_lease("si1", rs2)
545         # cancelling the second and third should make it go away
546         ss.remote_cancel_lease("si1", cs2)
547         ss.remote_cancel_lease("si1", cs2a)
548         readers = ss.remote_get_buckets("si1")
549         self.failUnlessEqual(len(readers), 0)
550         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
551         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
552         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
553
554         leases = list(ss.get_leases("si1"))
555         self.failUnlessEqual(len(leases), 0)
556
557
558         # test overlapping uploads
559         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
560                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
561         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
562                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
563         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
564                                                      sharenums, size, canary)
565         self.failUnlessEqual(len(already), 0)
566         self.failUnlessEqual(len(writers), 5)
567         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
568                                                        sharenums, size, canary)
569         self.failUnlessEqual(len(already2), 0)
570         self.failUnlessEqual(len(writers2), 0)
571         for wb in writers.values():
572             wb.remote_close()
573
574         leases = list(ss.get_leases("si3"))
575         self.failUnlessEqual(len(leases), 1)
576
577         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
578                                                        sharenums, size, canary)
579         self.failUnlessEqual(len(already3), 5)
580         self.failUnlessEqual(len(writers3), 0)
581
582         leases = list(ss.get_leases("si3"))
583         self.failUnlessEqual(len(leases), 2)
584
585     def test_readonly(self):
586         workdir = self.workdir("test_readonly")
587         ss = StorageServer(workdir, readonly_storage=True)
588         ss.setNodeID("\x00" * 20)
589         ss.setServiceParent(self.sparent)
590
591         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
592         self.failUnlessEqual(already, set())
593         self.failUnlessEqual(writers, {})
594
595         stats = ss.get_stats()
596         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
597                              False)
598         if "storage_server.disk_avail" in stats:
599             # windows does not have os.statvfs, so it doesn't give us disk
600             # stats. But if there are stats, readonly_storage means
601             # disk_avail=0
602             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
603
604     def test_discard(self):
605         # discard is really only used for other tests, but we test it anyways
606         workdir = self.workdir("test_discard")
607         ss = StorageServer(workdir, discard_storage=True)
608         ss.setNodeID("\x00" * 20)
609         ss.setServiceParent(self.sparent)
610
611         canary = FakeCanary()
612         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
613         self.failUnlessEqual(already, set())
614         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
615         for i,wb in writers.items():
616             wb.remote_write(0, "%25d" % i)
617             wb.remote_close()
618         # since we discard the data, the shares should be present but sparse.
619         # Since we write with some seeks, the data we read back will be all
620         # zeros.
621         b = ss.remote_get_buckets("vid")
622         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
623         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
624
625     def test_advise_corruption(self):
626         workdir = self.workdir("test_advise_corruption")
627         ss = StorageServer(workdir, discard_storage=True)
628         ss.setNodeID("\x00" * 20)
629         ss.setServiceParent(self.sparent)
630
631         si0_s = base32.b2a("si0")
632         ss.remote_advise_corrupt_share("immutable", "si0", 0,
633                                        "This share smells funny.\n")
634         reportdir = os.path.join(workdir, "corruption-advisories")
635         reports = os.listdir(reportdir)
636         self.failUnlessEqual(len(reports), 1)
637         report_si0 = reports[0]
638         self.failUnless(si0_s in report_si0, report_si0)
639         f = open(os.path.join(reportdir, report_si0), "r")
640         report = f.read()
641         f.close()
642         self.failUnless("type: immutable" in report)
643         self.failUnless(("storage_index: %s" % si0_s) in report)
644         self.failUnless("share_number: 0" in report)
645         self.failUnless("This share smells funny." in report)
646
647         # test the RIBucketWriter version too
648         si1_s = base32.b2a("si1")
649         already,writers = self.allocate(ss, "si1", [1], 75)
650         self.failUnlessEqual(already, set())
651         self.failUnlessEqual(set(writers.keys()), set([1]))
652         writers[1].remote_write(0, "data")
653         writers[1].remote_close()
654
655         b = ss.remote_get_buckets("si1")
656         self.failUnlessEqual(set(b.keys()), set([1]))
657         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
658
659         reports = os.listdir(reportdir)
660         self.failUnlessEqual(len(reports), 2)
661         report_si1 = [r for r in reports if si1_s in r][0]
662         f = open(os.path.join(reportdir, report_si1), "r")
663         report = f.read()
664         f.close()
665         self.failUnless("type: immutable" in report)
666         self.failUnless(("storage_index: %s" % si1_s) in report)
667         self.failUnless("share_number: 1" in report)
668         self.failUnless("This share tastes like dust." in report)
669
670
671
672 class MutableServer(unittest.TestCase):
673
674     def setUp(self):
675         self.sparent = LoggingServiceParent()
676         self._lease_secret = itertools.count()
677     def tearDown(self):
678         return self.sparent.stopService()
679
680     def workdir(self, name):
681         basedir = os.path.join("storage", "MutableServer", name)
682         return basedir
683
684     def create(self, name):
685         workdir = self.workdir(name)
686         ss = StorageServer(workdir)
687         ss.setServiceParent(self.sparent)
688         ss.setNodeID("\x00" * 20)
689         return ss
690
691     def test_create(self):
692         ss = self.create("test_create")
693
694     def write_enabler(self, we_tag):
695         return hashutil.tagged_hash("we_blah", we_tag)
696
697     def renew_secret(self, tag):
698         return hashutil.tagged_hash("renew_blah", str(tag))
699
700     def cancel_secret(self, tag):
701         return hashutil.tagged_hash("cancel_blah", str(tag))
702
703     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
704         write_enabler = self.write_enabler(we_tag)
705         renew_secret = self.renew_secret(lease_tag)
706         cancel_secret = self.cancel_secret(lease_tag)
707         rstaraw = ss.remote_slot_testv_and_readv_and_writev
708         testandwritev = dict( [ (shnum, ([], [], None) )
709                          for shnum in sharenums ] )
710         readv = []
711         rc = rstaraw(storage_index,
712                      (write_enabler, renew_secret, cancel_secret),
713                      testandwritev,
714                      readv)
715         (did_write, readv_data) = rc
716         self.failUnless(did_write)
717         self.failUnless(isinstance(readv_data, dict))
718         self.failUnlessEqual(len(readv_data), 0)
719
720     def test_container_size(self):
721         ss = self.create("test_container_size")
722         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
723                       set([0,1,2]), 100)
724         rstaraw = ss.remote_slot_testv_and_readv_and_writev
725         secrets = ( self.write_enabler("we1"),
726                     self.renew_secret("we1"),
727                     self.cancel_secret("we1") )
728         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
729         answer = rstaraw("si1", secrets,
730                          {0: ([], [(0,data)], len(data)+12)},
731                          [])
732         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
733
734         # trying to make the container too large will raise an exception
735         TOOBIG = MutableShareFile.MAX_SIZE + 10
736         self.failUnlessRaises(DataTooLargeError,
737                               rstaraw, "si1", secrets,
738                               {0: ([], [(0,data)], TOOBIG)},
739                               [])
740
741         # it should be possible to make the container smaller, although at
742         # the moment this doesn't actually affect the share
743         answer = rstaraw("si1", secrets,
744                          {0: ([], [(0,data)], len(data)+8)},
745                          [])
746         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
747
748     def test_allocate(self):
749         ss = self.create("test_allocate")
750         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
751                       set([0,1,2]), 100)
752
753         read = ss.remote_slot_readv
754         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
755                              {0: [""]})
756         self.failUnlessEqual(read("si1", [], [(0, 10)]),
757                              {0: [""], 1: [""], 2: [""]})
758         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
759                              {0: [""]})
760
761         # try writing to one
762         secrets = ( self.write_enabler("we1"),
763                     self.renew_secret("we1"),
764                     self.cancel_secret("we1") )
765         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
766         write = ss.remote_slot_testv_and_readv_and_writev
767         answer = write("si1", secrets,
768                        {0: ([], [(0,data)], None)},
769                        [])
770         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
771
772         self.failUnlessEqual(read("si1", [0], [(0,20)]),
773                              {0: ["00000000001111111111"]})
774         self.failUnlessEqual(read("si1", [0], [(95,10)]),
775                              {0: ["99999"]})
776         #self.failUnlessEqual(s0.remote_get_length(), 100)
777
778         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
779         f = self.failUnlessRaises(BadWriteEnablerError,
780                                   write, "si1", bad_secrets,
781                                   {}, [])
782         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
783
784         # this testv should fail
785         answer = write("si1", secrets,
786                        {0: ([(0, 12, "eq", "444444444444"),
787                              (20, 5, "eq", "22222"),
788                              ],
789                             [(0, "x"*100)],
790                             None),
791                         },
792                        [(0,12), (20,5)],
793                        )
794         self.failUnlessEqual(answer, (False,
795                                       {0: ["000000000011", "22222"],
796                                        1: ["", ""],
797                                        2: ["", ""],
798                                        }))
799         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
800
801         # as should this one
802         answer = write("si1", secrets,
803                        {0: ([(10, 5, "lt", "11111"),
804                              ],
805                             [(0, "x"*100)],
806                             None),
807                         },
808                        [(10,5)],
809                        )
810         self.failUnlessEqual(answer, (False,
811                                       {0: ["11111"],
812                                        1: [""],
813                                        2: [""]},
814                                       ))
815         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
816
817
818     def test_operators(self):
819         # test operators, the data we're comparing is '11111' in all cases.
820         # test both fail+pass, reset data after each one.
821         ss = self.create("test_operators")
822
823         secrets = ( self.write_enabler("we1"),
824                     self.renew_secret("we1"),
825                     self.cancel_secret("we1") )
826         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
827         write = ss.remote_slot_testv_and_readv_and_writev
828         read = ss.remote_slot_readv
829
830         def reset():
831             write("si1", secrets,
832                   {0: ([], [(0,data)], None)},
833                   [])
834
835         reset()
836
837         #  lt
838         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
839                                              ],
840                                             [(0, "x"*100)],
841                                             None,
842                                             )}, [(10,5)])
843         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
844         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
845         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
846         reset()
847
848         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
849                                              ],
850                                             [(0, "x"*100)],
851                                             None,
852                                             )}, [(10,5)])
853         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
854         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
855         reset()
856
857         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
858                                              ],
859                                             [(0, "y"*100)],
860                                             None,
861                                             )}, [(10,5)])
862         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
863         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
864         reset()
865
866         #  le
867         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
868                                              ],
869                                             [(0, "x"*100)],
870                                             None,
871                                             )}, [(10,5)])
872         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
873         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
874         reset()
875
876         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
877                                              ],
878                                             [(0, "y"*100)],
879                                             None,
880                                             )}, [(10,5)])
881         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
882         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
883         reset()
884
885         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
886                                              ],
887                                             [(0, "y"*100)],
888                                             None,
889                                             )}, [(10,5)])
890         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
891         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
892         reset()
893
894         #  eq
895         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
896                                              ],
897                                             [(0, "x"*100)],
898                                             None,
899                                             )}, [(10,5)])
900         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
901         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
902         reset()
903
904         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
905                                              ],
906                                             [(0, "y"*100)],
907                                             None,
908                                             )}, [(10,5)])
909         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
910         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
911         reset()
912
913         #  ne
914         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
915                                              ],
916                                             [(0, "x"*100)],
917                                             None,
918                                             )}, [(10,5)])
919         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
920         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
921         reset()
922
923         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
924                                              ],
925                                             [(0, "y"*100)],
926                                             None,
927                                             )}, [(10,5)])
928         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
929         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
930         reset()
931
932         #  ge
933         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
934                                              ],
935                                             [(0, "y"*100)],
936                                             None,
937                                             )}, [(10,5)])
938         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
939         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
940         reset()
941
942         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
943                                              ],
944                                             [(0, "y"*100)],
945                                             None,
946                                             )}, [(10,5)])
947         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
948         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
949         reset()
950
951         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
952                                              ],
953                                             [(0, "y"*100)],
954                                             None,
955                                             )}, [(10,5)])
956         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
957         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
958         reset()
959
960         #  gt
961         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
962                                              ],
963                                             [(0, "y"*100)],
964                                             None,
965                                             )}, [(10,5)])
966         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
967         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
968         reset()
969
970         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
971                                              ],
972                                             [(0, "x"*100)],
973                                             None,
974                                             )}, [(10,5)])
975         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
976         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
977         reset()
978
979         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
980                                              ],
981                                             [(0, "x"*100)],
982                                             None,
983                                             )}, [(10,5)])
984         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
985         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
986         reset()
987
988         # finally, test some operators against empty shares
989         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
990                                              ],
991                                             [(0, "x"*100)],
992                                             None,
993                                             )}, [(10,5)])
994         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
995         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
996         reset()
997
998     def test_readv(self):
999         ss = self.create("test_readv")
1000         secrets = ( self.write_enabler("we1"),
1001                     self.renew_secret("we1"),
1002                     self.cancel_secret("we1") )
1003         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1004         write = ss.remote_slot_testv_and_readv_and_writev
1005         read = ss.remote_slot_readv
1006         data = [("%d" % i) * 100 for i in range(3)]
1007         rc = write("si1", secrets,
1008                    {0: ([], [(0,data[0])], None),
1009                     1: ([], [(0,data[1])], None),
1010                     2: ([], [(0,data[2])], None),
1011                     }, [])
1012         self.failUnlessEqual(rc, (True, {}))
1013
1014         answer = read("si1", [], [(0, 10)])
1015         self.failUnlessEqual(answer, {0: ["0"*10],
1016                                       1: ["1"*10],
1017                                       2: ["2"*10]})
1018
1019     def compare_leases_without_timestamps(self, leases_a, leases_b):
1020         self.failUnlessEqual(len(leases_a), len(leases_b))
1021         for i in range(len(leases_a)):
1022             num_a, a = leases_a[i]
1023             num_b, b = leases_b[i]
1024             self.failUnlessEqual(num_a, num_b)
1025             self.failUnlessEqual(a.owner_num,       b.owner_num)
1026             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1027             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1028             self.failUnlessEqual(a.nodeid,          b.nodeid)
1029
1030     def compare_leases(self, leases_a, leases_b):
1031         self.failUnlessEqual(len(leases_a), len(leases_b))
1032         for i in range(len(leases_a)):
1033             num_a, a = leases_a[i]
1034             num_b, b = leases_b[i]
1035             self.failUnlessEqual(num_a, num_b)
1036             self.failUnlessEqual(a.owner_num,       b.owner_num)
1037             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1038             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1039             self.failUnlessEqual(a.nodeid,          b.nodeid)
1040             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1041
1042     def test_leases(self):
1043         ss = self.create("test_leases")
1044         def secrets(n):
1045             return ( self.write_enabler("we1"),
1046                      self.renew_secret("we1-%d" % n),
1047                      self.cancel_secret("we1-%d" % n) )
1048         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1049         write = ss.remote_slot_testv_and_readv_and_writev
1050         read = ss.remote_slot_readv
1051         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1052         self.failUnlessEqual(rc, (True, {}))
1053
1054         # create a random non-numeric file in the bucket directory, to
1055         # exercise the code that's supposed to ignore those.
1056         bucket_dir = os.path.join(self.workdir("test_leases"),
1057                                   "shares", storage_index_to_dir("si1"))
1058         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1059         f.write("you ought to be ignoring me\n")
1060         f.close()
1061
1062         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1063         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1064
1065         # add-lease on a missing storage index is silently ignored
1066         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1067
1068         # re-allocate the slots and use the same secrets, that should update
1069         # the lease
1070         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1071         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1072
1073         # renew it directly
1074         ss.remote_renew_lease("si1", secrets(0)[1])
1075         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1076
1077         # now allocate them with a bunch of different secrets, to trigger the
1078         # extended lease code. Use add_lease for one of them.
1079         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1080         self.failUnlessEqual(len(s0.debug_get_leases()), 2)
1081         secrets2 = secrets(2)
1082         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1083         self.failUnlessEqual(len(s0.debug_get_leases()), 3)
1084         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1085         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1086         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1087
1088         self.failUnlessEqual(len(s0.debug_get_leases()), 6)
1089
1090         # cancel one of them
1091         ss.remote_cancel_lease("si1", secrets(5)[2])
1092         self.failUnlessEqual(len(s0.debug_get_leases()), 5)
1093
1094         all_leases = s0.debug_get_leases()
1095         # and write enough data to expand the container, forcing the server
1096         # to move the leases
1097         write("si1", secrets(0),
1098               {0: ([], [(0,data)], 200), },
1099               [])
1100
1101         # read back the leases, make sure they're still intact.
1102         self.compare_leases_without_timestamps(all_leases,
1103                                                s0.debug_get_leases())
1104
1105         ss.remote_renew_lease("si1", secrets(0)[1])
1106         ss.remote_renew_lease("si1", secrets(1)[1])
1107         ss.remote_renew_lease("si1", secrets(2)[1])
1108         ss.remote_renew_lease("si1", secrets(3)[1])
1109         ss.remote_renew_lease("si1", secrets(4)[1])
1110         self.compare_leases_without_timestamps(all_leases,
1111                                                s0.debug_get_leases())
1112         # get a new copy of the leases, with the current timestamps. Reading
1113         # data and failing to renew/cancel leases should leave the timestamps
1114         # alone.
1115         all_leases = s0.debug_get_leases()
1116         # renewing with a bogus token should prompt an error message
1117
1118         # examine the exception thus raised, make sure the old nodeid is
1119         # present, to provide for share migration
1120         e = self.failUnlessRaises(IndexError,
1121                                   ss.remote_renew_lease, "si1",
1122                                   secrets(20)[1])
1123         e_s = str(e)
1124         self.failUnless("Unable to renew non-existent lease" in e_s)
1125         self.failUnless("I have leases accepted by nodeids:" in e_s)
1126         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1127
1128         # same for cancelling
1129         self.failUnlessRaises(IndexError,
1130                               ss.remote_cancel_lease, "si1",
1131                               secrets(20)[2])
1132         self.compare_leases(all_leases, s0.debug_get_leases())
1133
1134         # reading shares should not modify the timestamp
1135         read("si1", [], [(0,200)])
1136         self.compare_leases(all_leases, s0.debug_get_leases())
1137
1138         write("si1", secrets(0),
1139               {0: ([], [(200, "make me bigger")], None)}, [])
1140         self.compare_leases_without_timestamps(all_leases,
1141                                                s0.debug_get_leases())
1142
1143         write("si1", secrets(0),
1144               {0: ([], [(500, "make me really bigger")], None)}, [])
1145         self.compare_leases_without_timestamps(all_leases,
1146                                                s0.debug_get_leases())
1147
1148         # now cancel them all
1149         ss.remote_cancel_lease("si1", secrets(0)[2])
1150         ss.remote_cancel_lease("si1", secrets(1)[2])
1151         ss.remote_cancel_lease("si1", secrets(2)[2])
1152         ss.remote_cancel_lease("si1", secrets(3)[2])
1153
1154         # the slot should still be there
1155         remaining_shares = read("si1", [], [(0,10)])
1156         self.failUnlessEqual(len(remaining_shares), 1)
1157         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1158
1159         # cancelling a non-existent lease should raise an IndexError
1160         self.failUnlessRaises(IndexError,
1161                               ss.remote_cancel_lease, "si1", "nonsecret")
1162
1163         # and the slot should still be there
1164         remaining_shares = read("si1", [], [(0,10)])
1165         self.failUnlessEqual(len(remaining_shares), 1)
1166         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1167
1168         ss.remote_cancel_lease("si1", secrets(4)[2])
1169         # now the slot should be gone
1170         no_shares = read("si1", [], [(0,10)])
1171         self.failUnlessEqual(no_shares, {})
1172
1173         # cancelling a lease on a non-existent share should raise an IndexError
1174         self.failUnlessRaises(IndexError,
1175                               ss.remote_cancel_lease, "si2", "nonsecret")
1176
1177     def test_remove(self):
1178         ss = self.create("test_remove")
1179         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1180                       set([0,1,2]), 100)
1181         readv = ss.remote_slot_readv
1182         writev = ss.remote_slot_testv_and_readv_and_writev
1183         secrets = ( self.write_enabler("we1"),
1184                     self.renew_secret("we1"),
1185                     self.cancel_secret("we1") )
1186         # delete sh0 by setting its size to zero
1187         answer = writev("si1", secrets,
1188                         {0: ([], [], 0)},
1189                         [])
1190         # the answer should mention all the shares that existed before the
1191         # write
1192         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1193         # but a new read should show only sh1 and sh2
1194         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1195                              {1: [""], 2: [""]})
1196
1197         # delete sh1 by setting its size to zero
1198         answer = writev("si1", secrets,
1199                         {1: ([], [], 0)},
1200                         [])
1201         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1202         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1203                              {2: [""]})
1204
1205         # delete sh2 by setting its size to zero
1206         answer = writev("si1", secrets,
1207                         {2: ([], [], 0)},
1208                         [])
1209         self.failUnlessEqual(answer, (True, {2:[]}) )
1210         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1211                              {})
1212         # and the bucket directory should now be gone
1213         si = base32.b2a("si1")
1214         # note: this is a detail of the storage server implementation, and
1215         # may change in the future
1216         prefix = si[:2]
1217         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1218         bucketdir = os.path.join(prefixdir, si)
1219         self.failUnless(os.path.exists(prefixdir))
1220         self.failIf(os.path.exists(bucketdir))
1221
1222 class Stats(unittest.TestCase):
1223
1224     def setUp(self):
1225         self.sparent = LoggingServiceParent()
1226         self._lease_secret = itertools.count()
1227     def tearDown(self):
1228         return self.sparent.stopService()
1229
1230     def workdir(self, name):
1231         basedir = os.path.join("storage", "Server", name)
1232         return basedir
1233
1234     def create(self, name):
1235         workdir = self.workdir(name)
1236         ss = StorageServer(workdir)
1237         ss.setNodeID("\x00" * 20)
1238         ss.setServiceParent(self.sparent)
1239         return ss
1240
1241     def test_latencies(self):
1242         ss = self.create("test_latencies")
1243         for i in range(10000):
1244             ss.add_latency("allocate", 1.0 * i)
1245         for i in range(1000):
1246             ss.add_latency("renew", 1.0 * i)
1247         for i in range(10):
1248             ss.add_latency("cancel", 2.0 * i)
1249         ss.add_latency("get", 5.0)
1250
1251         output = ss.get_latencies()
1252
1253         self.failUnlessEqual(sorted(output.keys()),
1254                              sorted(["allocate", "renew", "cancel", "get"]))
1255         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1256         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1257         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1258         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1259         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1260         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1261         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1262         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1263         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1264
1265         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1266         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1267         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1268         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1269         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1270         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1271         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1272         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1273         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1274
1275         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1276         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1277         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1278         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1279         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1280         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1281         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1282         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1283         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1284
1285         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1286         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1287         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1288         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1289         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1290         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1291         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1292         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1293         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)