]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
web/storage: make sure we can handle platforms without os.statvfs too
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 from twisted.trial import unittest
2
3 from twisted.internet import defer
4 import time, os.path, stat, re
5 import itertools
6 from allmydata import interfaces
7 from allmydata.util import fileutil, hashutil, base32
8 from allmydata.storage.server import StorageServer, storage_index_to_dir
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.immutable import BucketWriter, BucketReader
11 from allmydata.storage.common import DataTooLargeError
12 from allmydata.storage.lease import LeaseInfo
13 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
14      ReadBucketProxy
15 from allmydata.interfaces import BadWriteEnablerError
16 from allmydata.test.common import LoggingServiceParent
17 from allmydata.web.storage import StorageStatus, abbreviate_if_known, \
18      remove_prefix
19
20 class Marker:
21     pass
22 class FakeCanary:
23     def __init__(self, ignore_disconnectors=False):
24         self.ignore = ignore_disconnectors
25         self.disconnectors = {}
26     def notifyOnDisconnect(self, f, *args, **kwargs):
27         if self.ignore:
28             return
29         m = Marker()
30         self.disconnectors[m] = (f, args, kwargs)
31         return m
32     def dontNotifyOnDisconnect(self, marker):
33         if self.ignore:
34             return
35         del self.disconnectors[marker]
36
37 class FakeStatsProvider:
38     def count(self, name, delta=1):
39         pass
40     def register_producer(self, producer):
41         pass
42
43 class Bucket(unittest.TestCase):
44     def make_workdir(self, name):
45         basedir = os.path.join("storage", "Bucket", name)
46         incoming = os.path.join(basedir, "tmp", "bucket")
47         final = os.path.join(basedir, "bucket")
48         fileutil.make_dirs(basedir)
49         fileutil.make_dirs(os.path.join(basedir, "tmp"))
50         return incoming, final
51
52     def bucket_writer_closed(self, bw, consumed):
53         pass
54     def add_latency(self, category, latency):
55         pass
56     def count(self, name, delta=1):
57         pass
58
59     def make_lease(self):
60         owner_num = 0
61         renew_secret = os.urandom(32)
62         cancel_secret = os.urandom(32)
63         expiration_time = time.time() + 5000
64         return LeaseInfo(owner_num, renew_secret, cancel_secret,
65                          expiration_time, "\x00" * 20)
66
67     def test_create(self):
68         incoming, final = self.make_workdir("test_create")
69         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
70                           FakeCanary())
71         bw.remote_write(0, "a"*25)
72         bw.remote_write(25, "b"*25)
73         bw.remote_write(50, "c"*25)
74         bw.remote_write(75, "d"*7)
75         bw.remote_close()
76
77     def test_readwrite(self):
78         incoming, final = self.make_workdir("test_readwrite")
79         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
80                           FakeCanary())
81         bw.remote_write(0, "a"*25)
82         bw.remote_write(25, "b"*25)
83         bw.remote_write(50, "c"*7) # last block may be short
84         bw.remote_close()
85
86         # now read from it
87         br = BucketReader(self, bw.finalhome)
88         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
89         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
90         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
91
92 class RemoteBucket:
93
94     def callRemote(self, methname, *args, **kwargs):
95         def _call():
96             meth = getattr(self.target, "remote_" + methname)
97             return meth(*args, **kwargs)
98         return defer.maybeDeferred(_call)
99
100 class BucketProxy(unittest.TestCase):
101     def make_bucket(self, name, size):
102         basedir = os.path.join("storage", "BucketProxy", name)
103         incoming = os.path.join(basedir, "tmp", "bucket")
104         final = os.path.join(basedir, "bucket")
105         fileutil.make_dirs(basedir)
106         fileutil.make_dirs(os.path.join(basedir, "tmp"))
107         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
108                           FakeCanary())
109         rb = RemoteBucket()
110         rb.target = bw
111         return bw, rb, final
112
113     def make_lease(self):
114         owner_num = 0
115         renew_secret = os.urandom(32)
116         cancel_secret = os.urandom(32)
117         expiration_time = time.time() + 5000
118         return LeaseInfo(owner_num, renew_secret, cancel_secret,
119                          expiration_time, "\x00" * 20)
120
121     def bucket_writer_closed(self, bw, consumed):
122         pass
123     def add_latency(self, category, latency):
124         pass
125     def count(self, name, delta=1):
126         pass
127
128     def test_create(self):
129         bw, rb, sharefname = self.make_bucket("test_create", 500)
130         bp = WriteBucketProxy(rb,
131                               data_size=300,
132                               block_size=10,
133                               num_segments=5,
134                               num_share_hashes=3,
135                               uri_extension_size_max=500, nodeid=None)
136         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
137
138     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
139         # Let's pretend each share has 100 bytes of data, and that there are
140         # 4 segments (25 bytes each), and 8 shares total. So the two
141         # per-segment merkle trees (crypttext_hash_tree,
142         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
143         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
144         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
145         # long. That should make the whole share:
146         #
147         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
148         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
149
150         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
151
152         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
153                             for i in range(7)]
154         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
155                         for i in range(7)]
156         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
157                         for i in (1,9,13)]
158         uri_extension = "s" + "E"*498 + "e"
159
160         bw, rb, sharefname = self.make_bucket(name, sharesize)
161         bp = wbp_class(rb,
162                        data_size=95,
163                        block_size=25,
164                        num_segments=4,
165                        num_share_hashes=3,
166                        uri_extension_size_max=len(uri_extension),
167                        nodeid=None)
168
169         d = bp.put_header()
170         d.addCallback(lambda res: bp.put_block(0, "a"*25))
171         d.addCallback(lambda res: bp.put_block(1, "b"*25))
172         d.addCallback(lambda res: bp.put_block(2, "c"*25))
173         d.addCallback(lambda res: bp.put_block(3, "d"*20))
174         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
175         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
176         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
177         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
178         d.addCallback(lambda res: bp.close())
179
180         # now read everything back
181         def _start_reading(res):
182             br = BucketReader(self, sharefname)
183             rb = RemoteBucket()
184             rb.target = br
185             rbp = rbp_class(rb, peerid="abc", storage_index="")
186             self.failUnless("to peer" in repr(rbp))
187             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
188
189             d1 = rbp.get_block_data(0, 25, 25)
190             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
191             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
192             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
193             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
194             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
195             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
196             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
197
198             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
199             d1.addCallback(lambda res:
200                            self.failUnlessEqual(res, crypttext_hashes))
201             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
202             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
203             d1.addCallback(lambda res: rbp.get_share_hashes())
204             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
205             d1.addCallback(lambda res: rbp.get_uri_extension())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, uri_extension))
208
209             return d1
210
211         d.addCallback(_start_reading)
212
213         return d
214
215     def test_readwrite_v1(self):
216         return self._do_test_readwrite("test_readwrite_v1",
217                                        0x24, WriteBucketProxy, ReadBucketProxy)
218
219     def test_readwrite_v2(self):
220         return self._do_test_readwrite("test_readwrite_v2",
221                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
222
223 class FakeDiskStorageServer(StorageServer):
224     def stat_disk(self, d):
225         return self.DISKAVAIL
226
227 class Server(unittest.TestCase):
228
229     def setUp(self):
230         self.sparent = LoggingServiceParent()
231         self._lease_secret = itertools.count()
232     def tearDown(self):
233         return self.sparent.stopService()
234
235     def workdir(self, name):
236         basedir = os.path.join("storage", "Server", name)
237         return basedir
238
239     def create(self, name, reserved_space=0, klass=StorageServer):
240         workdir = self.workdir(name)
241         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
242                    stats_provider=FakeStatsProvider())
243         ss.setServiceParent(self.sparent)
244         return ss
245
246     def test_create(self):
247         ss = self.create("test_create")
248
249     def allocate(self, ss, storage_index, sharenums, size, canary=None):
250         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
251         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
252         if not canary:
253             canary = FakeCanary()
254         return ss.remote_allocate_buckets(storage_index,
255                                           renew_secret, cancel_secret,
256                                           sharenums, size, canary)
257
258     def test_large_share(self):
259         ss = self.create("test_large_share")
260
261         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
262         self.failUnlessEqual(already, set())
263         self.failUnlessEqual(set(writers.keys()), set([0]))
264
265         shnum, bucket = writers.items()[0]
266         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
267         bucket.remote_write(2**32, "ab")
268         bucket.remote_close()
269
270         readers = ss.remote_get_buckets("allocate")
271         reader = readers[shnum]
272         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
273     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
274
275     def test_dont_overfill_dirs(self):
276         """
277         This test asserts that if you add a second share whose storage index
278         share lots of leading bits with an extant share (but isn't the exact
279         same storage index), this won't add an entry to the share directory.
280         """
281         ss = self.create("test_dont_overfill_dirs")
282         already, writers = self.allocate(ss, "storageindex", [0], 10)
283         for i, wb in writers.items():
284             wb.remote_write(0, "%10d" % i)
285             wb.remote_close()
286         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
287                                 "shares")
288         children_of_storedir = set(os.listdir(storedir))
289
290         # Now store another one under another storageindex that has leading
291         # chars the same as the first storageindex.
292         already, writers = self.allocate(ss, "storageindey", [0], 10)
293         for i, wb in writers.items():
294             wb.remote_write(0, "%10d" % i)
295             wb.remote_close()
296         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
297                                 "shares")
298         new_children_of_storedir = set(os.listdir(storedir))
299         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
300
301     def test_remove_incoming(self):
302         ss = self.create("test_remove_incoming")
303         already, writers = self.allocate(ss, "vid", range(3), 10)
304         for i,wb in writers.items():
305             wb.remote_write(0, "%10d" % i)
306             wb.remote_close()
307         incoming_share_dir = wb.incominghome
308         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
309         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
310         incoming_dir = os.path.dirname(incoming_prefix_dir)
311         self.failIf(os.path.exists(incoming_bucket_dir))
312         self.failIf(os.path.exists(incoming_prefix_dir))
313         self.failUnless(os.path.exists(incoming_dir))
314
315     def test_allocate(self):
316         ss = self.create("test_allocate")
317
318         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
319
320         canary = FakeCanary()
321         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
322         self.failUnlessEqual(already, set())
323         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
324
325         # while the buckets are open, they should not count as readable
326         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
327
328         # close the buckets
329         for i,wb in writers.items():
330             wb.remote_write(0, "%25d" % i)
331             wb.remote_close()
332             # aborting a bucket that was already closed is a no-op
333             wb.remote_abort()
334
335         # now they should be readable
336         b = ss.remote_get_buckets("allocate")
337         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
338         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
339
340         # now if we ask about writing again, the server should offer those
341         # three buckets as already present. It should offer them even if we
342         # don't ask about those specific ones.
343         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
344         self.failUnlessEqual(already, set([0,1,2]))
345         self.failUnlessEqual(set(writers.keys()), set([3,4]))
346
347         # while those two buckets are open for writing, the server should
348         # refuse to offer them to uploaders
349
350         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
351         self.failUnlessEqual(already2, set([0,1,2]))
352         self.failUnlessEqual(set(writers2.keys()), set([5]))
353
354         # aborting the writes should remove the tempfiles
355         for i,wb in writers2.items():
356             wb.remote_abort()
357         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
358         self.failUnlessEqual(already2, set([0,1,2]))
359         self.failUnlessEqual(set(writers2.keys()), set([5]))
360
361         for i,wb in writers2.items():
362             wb.remote_abort()
363         for i,wb in writers.items():
364             wb.remote_abort()
365
366     def test_disconnect(self):
367         # simulate a disconnection
368         ss = self.create("test_disconnect")
369         canary = FakeCanary()
370         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
371         self.failUnlessEqual(already, set())
372         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
373         for (f,args,kwargs) in canary.disconnectors.values():
374             f(*args, **kwargs)
375         del already
376         del writers
377
378         # that ought to delete the incoming shares
379         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
380         self.failUnlessEqual(already, set())
381         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
382
383     def test_reserved_space(self):
384         ss = self.create("test_reserved_space", reserved_space=10000,
385                          klass=FakeDiskStorageServer)
386         # the FakeDiskStorageServer doesn't do real statvfs() calls
387         ss.DISKAVAIL = 15000
388         # 15k available, 10k reserved, leaves 5k for shares
389
390         # a newly created and filled share incurs this much overhead, beyond
391         # the size we request.
392         OVERHEAD = 3*4
393         LEASE_SIZE = 4+32+32+4
394         canary = FakeCanary(True)
395         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
396         self.failUnlessEqual(len(writers), 3)
397         # now the StorageServer should have 3000 bytes provisionally
398         # allocated, allowing only 2000 more to be claimed
399         self.failUnlessEqual(len(ss._active_writers), 3)
400
401         # allocating 1001-byte shares only leaves room for one
402         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
403         self.failUnlessEqual(len(writers2), 1)
404         self.failUnlessEqual(len(ss._active_writers), 4)
405
406         # we abandon the first set, so their provisional allocation should be
407         # returned
408         del already
409         del writers
410         self.failUnlessEqual(len(ss._active_writers), 1)
411         # now we have a provisional allocation of 1001 bytes
412
413         # and we close the second set, so their provisional allocation should
414         # become real, long-term allocation, and grows to include the
415         # overhead.
416         for bw in writers2.values():
417             bw.remote_write(0, "a"*25)
418             bw.remote_close()
419         del already2
420         del writers2
421         del bw
422         self.failUnlessEqual(len(ss._active_writers), 0)
423
424         allocated = 1001 + OVERHEAD + LEASE_SIZE
425
426         # we have to manually increase DISKAVAIL, since we're not doing real
427         # disk measurements
428         ss.DISKAVAIL -= allocated
429
430         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
431         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
432         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
433         self.failUnlessEqual(len(writers3), 39)
434         self.failUnlessEqual(len(ss._active_writers), 39)
435
436         del already3
437         del writers3
438         self.failUnlessEqual(len(ss._active_writers), 0)
439         ss.disownServiceParent()
440         del ss
441
442     def test_seek(self):
443         basedir = self.workdir("test_seek_behavior")
444         fileutil.make_dirs(basedir)
445         filename = os.path.join(basedir, "testfile")
446         f = open(filename, "wb")
447         f.write("start")
448         f.close()
449         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
450         # files. mode="a" preserves previous contents but does not allow
451         # seeking-to-create-holes. mode="r+" allows both.
452         f = open(filename, "rb+")
453         f.seek(100)
454         f.write("100")
455         f.close()
456         filelen = os.stat(filename)[stat.ST_SIZE]
457         self.failUnlessEqual(filelen, 100+3)
458         f2 = open(filename, "rb")
459         self.failUnlessEqual(f2.read(5), "start")
460
461
462     def test_leases(self):
463         ss = self.create("test_leases")
464         canary = FakeCanary()
465         sharenums = range(5)
466         size = 100
467
468         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
469                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
470         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
471                                                      sharenums, size, canary)
472         self.failUnlessEqual(len(already), 0)
473         self.failUnlessEqual(len(writers), 5)
474         for wb in writers.values():
475             wb.remote_close()
476
477         leases = list(ss.get_leases("si0"))
478         self.failUnlessEqual(len(leases), 1)
479         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
480
481         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
482                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
483         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
484                                                      sharenums, size, canary)
485         for wb in writers.values():
486             wb.remote_close()
487
488         # take out a second lease on si1
489         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
490                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
491         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
492                                                      sharenums, size, canary)
493         self.failUnlessEqual(len(already), 5)
494         self.failUnlessEqual(len(writers), 0)
495
496         leases = list(ss.get_leases("si1"))
497         self.failUnlessEqual(len(leases), 2)
498         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
499
500         # and a third lease, using add-lease
501         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
502                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
503         ss.remote_add_lease("si1", rs2a, cs2a)
504         leases = list(ss.get_leases("si1"))
505         self.failUnlessEqual(len(leases), 3)
506         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
507
508         # add-lease on a missing storage index is silently ignored
509         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
510
511         # check that si0 is readable
512         readers = ss.remote_get_buckets("si0")
513         self.failUnlessEqual(len(readers), 5)
514
515         # renew the first lease. Only the proper renew_secret should work
516         ss.remote_renew_lease("si0", rs0)
517         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
518         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
519
520         # check that si0 is still readable
521         readers = ss.remote_get_buckets("si0")
522         self.failUnlessEqual(len(readers), 5)
523
524         # now cancel it
525         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
526         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
527         ss.remote_cancel_lease("si0", cs0)
528
529         # si0 should now be gone
530         readers = ss.remote_get_buckets("si0")
531         self.failUnlessEqual(len(readers), 0)
532         # and the renew should no longer work
533         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
534
535
536         # cancel the first lease on si1, leaving the second and third in place
537         ss.remote_cancel_lease("si1", cs1)
538         readers = ss.remote_get_buckets("si1")
539         self.failUnlessEqual(len(readers), 5)
540         # the corresponding renew should no longer work
541         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
542
543         leases = list(ss.get_leases("si1"))
544         self.failUnlessEqual(len(leases), 2)
545         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
546
547         ss.remote_renew_lease("si1", rs2)
548         # cancelling the second and third should make it go away
549         ss.remote_cancel_lease("si1", cs2)
550         ss.remote_cancel_lease("si1", cs2a)
551         readers = ss.remote_get_buckets("si1")
552         self.failUnlessEqual(len(readers), 0)
553         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
554         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
555         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
556
557         leases = list(ss.get_leases("si1"))
558         self.failUnlessEqual(len(leases), 0)
559
560
561         # test overlapping uploads
562         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
563                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
564         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
565                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
566         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
567                                                      sharenums, size, canary)
568         self.failUnlessEqual(len(already), 0)
569         self.failUnlessEqual(len(writers), 5)
570         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
571                                                        sharenums, size, canary)
572         self.failUnlessEqual(len(already2), 0)
573         self.failUnlessEqual(len(writers2), 0)
574         for wb in writers.values():
575             wb.remote_close()
576
577         leases = list(ss.get_leases("si3"))
578         self.failUnlessEqual(len(leases), 1)
579
580         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
581                                                        sharenums, size, canary)
582         self.failUnlessEqual(len(already3), 5)
583         self.failUnlessEqual(len(writers3), 0)
584
585         leases = list(ss.get_leases("si3"))
586         self.failUnlessEqual(len(leases), 2)
587
588     def test_readonly(self):
589         workdir = self.workdir("test_readonly")
590         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
591         ss.setServiceParent(self.sparent)
592
593         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
594         self.failUnlessEqual(already, set())
595         self.failUnlessEqual(writers, {})
596
597         stats = ss.get_stats()
598         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
599                              False)
600         if "storage_server.disk_avail" in stats:
601             # windows does not have os.statvfs, so it doesn't give us disk
602             # stats. But if there are stats, readonly_storage means
603             # disk_avail=0
604             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
605
606     def test_discard(self):
607         # discard is really only used for other tests, but we test it anyways
608         workdir = self.workdir("test_discard")
609         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
610         ss.setServiceParent(self.sparent)
611
612         canary = FakeCanary()
613         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
614         self.failUnlessEqual(already, set())
615         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
616         for i,wb in writers.items():
617             wb.remote_write(0, "%25d" % i)
618             wb.remote_close()
619         # since we discard the data, the shares should be present but sparse.
620         # Since we write with some seeks, the data we read back will be all
621         # zeros.
622         b = ss.remote_get_buckets("vid")
623         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
624         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
625
626     def test_advise_corruption(self):
627         workdir = self.workdir("test_advise_corruption")
628         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
629         ss.setServiceParent(self.sparent)
630
631         si0_s = base32.b2a("si0")
632         ss.remote_advise_corrupt_share("immutable", "si0", 0,
633                                        "This share smells funny.\n")
634         reportdir = os.path.join(workdir, "corruption-advisories")
635         reports = os.listdir(reportdir)
636         self.failUnlessEqual(len(reports), 1)
637         report_si0 = reports[0]
638         self.failUnless(si0_s in report_si0, report_si0)
639         f = open(os.path.join(reportdir, report_si0), "r")
640         report = f.read()
641         f.close()
642         self.failUnless("type: immutable" in report)
643         self.failUnless(("storage_index: %s" % si0_s) in report)
644         self.failUnless("share_number: 0" in report)
645         self.failUnless("This share smells funny." in report)
646
647         # test the RIBucketWriter version too
648         si1_s = base32.b2a("si1")
649         already,writers = self.allocate(ss, "si1", [1], 75)
650         self.failUnlessEqual(already, set())
651         self.failUnlessEqual(set(writers.keys()), set([1]))
652         writers[1].remote_write(0, "data")
653         writers[1].remote_close()
654
655         b = ss.remote_get_buckets("si1")
656         self.failUnlessEqual(set(b.keys()), set([1]))
657         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
658
659         reports = os.listdir(reportdir)
660         self.failUnlessEqual(len(reports), 2)
661         report_si1 = [r for r in reports if si1_s in r][0]
662         f = open(os.path.join(reportdir, report_si1), "r")
663         report = f.read()
664         f.close()
665         self.failUnless("type: immutable" in report)
666         self.failUnless(("storage_index: %s" % si1_s) in report)
667         self.failUnless("share_number: 1" in report)
668         self.failUnless("This share tastes like dust." in report)
669
670
671
672 class MutableServer(unittest.TestCase):
673
674     def setUp(self):
675         self.sparent = LoggingServiceParent()
676         self._lease_secret = itertools.count()
677     def tearDown(self):
678         return self.sparent.stopService()
679
680     def workdir(self, name):
681         basedir = os.path.join("storage", "MutableServer", name)
682         return basedir
683
684     def create(self, name):
685         workdir = self.workdir(name)
686         ss = StorageServer(workdir, "\x00" * 20)
687         ss.setServiceParent(self.sparent)
688         return ss
689
690     def test_create(self):
691         ss = self.create("test_create")
692
693     def write_enabler(self, we_tag):
694         return hashutil.tagged_hash("we_blah", we_tag)
695
696     def renew_secret(self, tag):
697         return hashutil.tagged_hash("renew_blah", str(tag))
698
699     def cancel_secret(self, tag):
700         return hashutil.tagged_hash("cancel_blah", str(tag))
701
702     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
703         write_enabler = self.write_enabler(we_tag)
704         renew_secret = self.renew_secret(lease_tag)
705         cancel_secret = self.cancel_secret(lease_tag)
706         rstaraw = ss.remote_slot_testv_and_readv_and_writev
707         testandwritev = dict( [ (shnum, ([], [], None) )
708                          for shnum in sharenums ] )
709         readv = []
710         rc = rstaraw(storage_index,
711                      (write_enabler, renew_secret, cancel_secret),
712                      testandwritev,
713                      readv)
714         (did_write, readv_data) = rc
715         self.failUnless(did_write)
716         self.failUnless(isinstance(readv_data, dict))
717         self.failUnlessEqual(len(readv_data), 0)
718
719     def test_container_size(self):
720         ss = self.create("test_container_size")
721         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
722                       set([0,1,2]), 100)
723         rstaraw = ss.remote_slot_testv_and_readv_and_writev
724         secrets = ( self.write_enabler("we1"),
725                     self.renew_secret("we1"),
726                     self.cancel_secret("we1") )
727         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
728         answer = rstaraw("si1", secrets,
729                          {0: ([], [(0,data)], len(data)+12)},
730                          [])
731         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
732
733         # trying to make the container too large will raise an exception
734         TOOBIG = MutableShareFile.MAX_SIZE + 10
735         self.failUnlessRaises(DataTooLargeError,
736                               rstaraw, "si1", secrets,
737                               {0: ([], [(0,data)], TOOBIG)},
738                               [])
739
740         # it should be possible to make the container smaller, although at
741         # the moment this doesn't actually affect the share
742         answer = rstaraw("si1", secrets,
743                          {0: ([], [(0,data)], len(data)+8)},
744                          [])
745         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
746
747     def test_allocate(self):
748         ss = self.create("test_allocate")
749         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
750                       set([0,1,2]), 100)
751
752         read = ss.remote_slot_readv
753         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
754                              {0: [""]})
755         self.failUnlessEqual(read("si1", [], [(0, 10)]),
756                              {0: [""], 1: [""], 2: [""]})
757         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
758                              {0: [""]})
759
760         # try writing to one
761         secrets = ( self.write_enabler("we1"),
762                     self.renew_secret("we1"),
763                     self.cancel_secret("we1") )
764         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
765         write = ss.remote_slot_testv_and_readv_and_writev
766         answer = write("si1", secrets,
767                        {0: ([], [(0,data)], None)},
768                        [])
769         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
770
771         self.failUnlessEqual(read("si1", [0], [(0,20)]),
772                              {0: ["00000000001111111111"]})
773         self.failUnlessEqual(read("si1", [0], [(95,10)]),
774                              {0: ["99999"]})
775         #self.failUnlessEqual(s0.remote_get_length(), 100)
776
777         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
778         f = self.failUnlessRaises(BadWriteEnablerError,
779                                   write, "si1", bad_secrets,
780                                   {}, [])
781         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
782
783         # this testv should fail
784         answer = write("si1", secrets,
785                        {0: ([(0, 12, "eq", "444444444444"),
786                              (20, 5, "eq", "22222"),
787                              ],
788                             [(0, "x"*100)],
789                             None),
790                         },
791                        [(0,12), (20,5)],
792                        )
793         self.failUnlessEqual(answer, (False,
794                                       {0: ["000000000011", "22222"],
795                                        1: ["", ""],
796                                        2: ["", ""],
797                                        }))
798         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
799
800         # as should this one
801         answer = write("si1", secrets,
802                        {0: ([(10, 5, "lt", "11111"),
803                              ],
804                             [(0, "x"*100)],
805                             None),
806                         },
807                        [(10,5)],
808                        )
809         self.failUnlessEqual(answer, (False,
810                                       {0: ["11111"],
811                                        1: [""],
812                                        2: [""]},
813                                       ))
814         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
815
816
817     def test_operators(self):
818         # test operators, the data we're comparing is '11111' in all cases.
819         # test both fail+pass, reset data after each one.
820         ss = self.create("test_operators")
821
822         secrets = ( self.write_enabler("we1"),
823                     self.renew_secret("we1"),
824                     self.cancel_secret("we1") )
825         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
826         write = ss.remote_slot_testv_and_readv_and_writev
827         read = ss.remote_slot_readv
828
829         def reset():
830             write("si1", secrets,
831                   {0: ([], [(0,data)], None)},
832                   [])
833
834         reset()
835
836         #  lt
837         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
838                                              ],
839                                             [(0, "x"*100)],
840                                             None,
841                                             )}, [(10,5)])
842         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
843         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
844         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
845         reset()
846
847         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
848                                              ],
849                                             [(0, "x"*100)],
850                                             None,
851                                             )}, [(10,5)])
852         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
853         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
854         reset()
855
856         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
857                                              ],
858                                             [(0, "y"*100)],
859                                             None,
860                                             )}, [(10,5)])
861         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
862         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
863         reset()
864
865         #  le
866         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
867                                              ],
868                                             [(0, "x"*100)],
869                                             None,
870                                             )}, [(10,5)])
871         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
872         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
873         reset()
874
875         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
876                                              ],
877                                             [(0, "y"*100)],
878                                             None,
879                                             )}, [(10,5)])
880         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
881         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
882         reset()
883
884         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
885                                              ],
886                                             [(0, "y"*100)],
887                                             None,
888                                             )}, [(10,5)])
889         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
890         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
891         reset()
892
893         #  eq
894         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
895                                              ],
896                                             [(0, "x"*100)],
897                                             None,
898                                             )}, [(10,5)])
899         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
900         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
901         reset()
902
903         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
904                                              ],
905                                             [(0, "y"*100)],
906                                             None,
907                                             )}, [(10,5)])
908         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
909         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
910         reset()
911
912         #  ne
913         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
914                                              ],
915                                             [(0, "x"*100)],
916                                             None,
917                                             )}, [(10,5)])
918         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
919         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
920         reset()
921
922         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
923                                              ],
924                                             [(0, "y"*100)],
925                                             None,
926                                             )}, [(10,5)])
927         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
928         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
929         reset()
930
931         #  ge
932         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
933                                              ],
934                                             [(0, "y"*100)],
935                                             None,
936                                             )}, [(10,5)])
937         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
938         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
939         reset()
940
941         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
942                                              ],
943                                             [(0, "y"*100)],
944                                             None,
945                                             )}, [(10,5)])
946         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
947         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
948         reset()
949
950         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
951                                              ],
952                                             [(0, "y"*100)],
953                                             None,
954                                             )}, [(10,5)])
955         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
956         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
957         reset()
958
959         #  gt
960         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
961                                              ],
962                                             [(0, "y"*100)],
963                                             None,
964                                             )}, [(10,5)])
965         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
966         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
967         reset()
968
969         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
970                                              ],
971                                             [(0, "x"*100)],
972                                             None,
973                                             )}, [(10,5)])
974         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
976         reset()
977
978         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
979                                              ],
980                                             [(0, "x"*100)],
981                                             None,
982                                             )}, [(10,5)])
983         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
984         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
985         reset()
986
987         # finally, test some operators against empty shares
988         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
989                                              ],
990                                             [(0, "x"*100)],
991                                             None,
992                                             )}, [(10,5)])
993         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
994         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
995         reset()
996
997     def test_readv(self):
998         ss = self.create("test_readv")
999         secrets = ( self.write_enabler("we1"),
1000                     self.renew_secret("we1"),
1001                     self.cancel_secret("we1") )
1002         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1003         write = ss.remote_slot_testv_and_readv_and_writev
1004         read = ss.remote_slot_readv
1005         data = [("%d" % i) * 100 for i in range(3)]
1006         rc = write("si1", secrets,
1007                    {0: ([], [(0,data[0])], None),
1008                     1: ([], [(0,data[1])], None),
1009                     2: ([], [(0,data[2])], None),
1010                     }, [])
1011         self.failUnlessEqual(rc, (True, {}))
1012
1013         answer = read("si1", [], [(0, 10)])
1014         self.failUnlessEqual(answer, {0: ["0"*10],
1015                                       1: ["1"*10],
1016                                       2: ["2"*10]})
1017
1018     def compare_leases_without_timestamps(self, leases_a, leases_b):
1019         self.failUnlessEqual(len(leases_a), len(leases_b))
1020         for i in range(len(leases_a)):
1021             num_a, a = leases_a[i]
1022             num_b, b = leases_b[i]
1023             self.failUnlessEqual(num_a, num_b)
1024             self.failUnlessEqual(a.owner_num,       b.owner_num)
1025             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1026             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1027             self.failUnlessEqual(a.nodeid,          b.nodeid)
1028
1029     def compare_leases(self, leases_a, leases_b):
1030         self.failUnlessEqual(len(leases_a), len(leases_b))
1031         for i in range(len(leases_a)):
1032             num_a, a = leases_a[i]
1033             num_b, b = leases_b[i]
1034             self.failUnlessEqual(num_a, num_b)
1035             self.failUnlessEqual(a.owner_num,       b.owner_num)
1036             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1037             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1038             self.failUnlessEqual(a.nodeid,          b.nodeid)
1039             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1040
1041     def test_leases(self):
1042         ss = self.create("test_leases")
1043         def secrets(n):
1044             return ( self.write_enabler("we1"),
1045                      self.renew_secret("we1-%d" % n),
1046                      self.cancel_secret("we1-%d" % n) )
1047         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1048         write = ss.remote_slot_testv_and_readv_and_writev
1049         read = ss.remote_slot_readv
1050         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1051         self.failUnlessEqual(rc, (True, {}))
1052
1053         # create a random non-numeric file in the bucket directory, to
1054         # exercise the code that's supposed to ignore those.
1055         bucket_dir = os.path.join(self.workdir("test_leases"),
1056                                   "shares", storage_index_to_dir("si1"))
1057         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1058         f.write("you ought to be ignoring me\n")
1059         f.close()
1060
1061         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1062         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1063
1064         # add-lease on a missing storage index is silently ignored
1065         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1066
1067         # re-allocate the slots and use the same secrets, that should update
1068         # the lease
1069         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1070         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1071
1072         # renew it directly
1073         ss.remote_renew_lease("si1", secrets(0)[1])
1074         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1075
1076         # now allocate them with a bunch of different secrets, to trigger the
1077         # extended lease code. Use add_lease for one of them.
1078         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1079         self.failUnlessEqual(len(s0.debug_get_leases()), 2)
1080         secrets2 = secrets(2)
1081         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1082         self.failUnlessEqual(len(s0.debug_get_leases()), 3)
1083         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1084         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1085         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1086
1087         self.failUnlessEqual(len(s0.debug_get_leases()), 6)
1088
1089         # cancel one of them
1090         ss.remote_cancel_lease("si1", secrets(5)[2])
1091         self.failUnlessEqual(len(s0.debug_get_leases()), 5)
1092
1093         all_leases = s0.debug_get_leases()
1094         # and write enough data to expand the container, forcing the server
1095         # to move the leases
1096         write("si1", secrets(0),
1097               {0: ([], [(0,data)], 200), },
1098               [])
1099
1100         # read back the leases, make sure they're still intact.
1101         self.compare_leases_without_timestamps(all_leases,
1102                                                s0.debug_get_leases())
1103
1104         ss.remote_renew_lease("si1", secrets(0)[1])
1105         ss.remote_renew_lease("si1", secrets(1)[1])
1106         ss.remote_renew_lease("si1", secrets(2)[1])
1107         ss.remote_renew_lease("si1", secrets(3)[1])
1108         ss.remote_renew_lease("si1", secrets(4)[1])
1109         self.compare_leases_without_timestamps(all_leases,
1110                                                s0.debug_get_leases())
1111         # get a new copy of the leases, with the current timestamps. Reading
1112         # data and failing to renew/cancel leases should leave the timestamps
1113         # alone.
1114         all_leases = s0.debug_get_leases()
1115         # renewing with a bogus token should prompt an error message
1116
1117         # examine the exception thus raised, make sure the old nodeid is
1118         # present, to provide for share migration
1119         e = self.failUnlessRaises(IndexError,
1120                                   ss.remote_renew_lease, "si1",
1121                                   secrets(20)[1])
1122         e_s = str(e)
1123         self.failUnless("Unable to renew non-existent lease" in e_s)
1124         self.failUnless("I have leases accepted by nodeids:" in e_s)
1125         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1126
1127         # same for cancelling
1128         self.failUnlessRaises(IndexError,
1129                               ss.remote_cancel_lease, "si1",
1130                               secrets(20)[2])
1131         self.compare_leases(all_leases, s0.debug_get_leases())
1132
1133         # reading shares should not modify the timestamp
1134         read("si1", [], [(0,200)])
1135         self.compare_leases(all_leases, s0.debug_get_leases())
1136
1137         write("si1", secrets(0),
1138               {0: ([], [(200, "make me bigger")], None)}, [])
1139         self.compare_leases_without_timestamps(all_leases,
1140                                                s0.debug_get_leases())
1141
1142         write("si1", secrets(0),
1143               {0: ([], [(500, "make me really bigger")], None)}, [])
1144         self.compare_leases_without_timestamps(all_leases,
1145                                                s0.debug_get_leases())
1146
1147         # now cancel them all
1148         ss.remote_cancel_lease("si1", secrets(0)[2])
1149         ss.remote_cancel_lease("si1", secrets(1)[2])
1150         ss.remote_cancel_lease("si1", secrets(2)[2])
1151         ss.remote_cancel_lease("si1", secrets(3)[2])
1152
1153         # the slot should still be there
1154         remaining_shares = read("si1", [], [(0,10)])
1155         self.failUnlessEqual(len(remaining_shares), 1)
1156         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1157
1158         # cancelling a non-existent lease should raise an IndexError
1159         self.failUnlessRaises(IndexError,
1160                               ss.remote_cancel_lease, "si1", "nonsecret")
1161
1162         # and the slot should still be there
1163         remaining_shares = read("si1", [], [(0,10)])
1164         self.failUnlessEqual(len(remaining_shares), 1)
1165         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1166
1167         ss.remote_cancel_lease("si1", secrets(4)[2])
1168         # now the slot should be gone
1169         no_shares = read("si1", [], [(0,10)])
1170         self.failUnlessEqual(no_shares, {})
1171
1172         # cancelling a lease on a non-existent share should raise an IndexError
1173         self.failUnlessRaises(IndexError,
1174                               ss.remote_cancel_lease, "si2", "nonsecret")
1175
1176     def test_remove(self):
1177         ss = self.create("test_remove")
1178         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1179                       set([0,1,2]), 100)
1180         readv = ss.remote_slot_readv
1181         writev = ss.remote_slot_testv_and_readv_and_writev
1182         secrets = ( self.write_enabler("we1"),
1183                     self.renew_secret("we1"),
1184                     self.cancel_secret("we1") )
1185         # delete sh0 by setting its size to zero
1186         answer = writev("si1", secrets,
1187                         {0: ([], [], 0)},
1188                         [])
1189         # the answer should mention all the shares that existed before the
1190         # write
1191         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1192         # but a new read should show only sh1 and sh2
1193         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1194                              {1: [""], 2: [""]})
1195
1196         # delete sh1 by setting its size to zero
1197         answer = writev("si1", secrets,
1198                         {1: ([], [], 0)},
1199                         [])
1200         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1201         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1202                              {2: [""]})
1203
1204         # delete sh2 by setting its size to zero
1205         answer = writev("si1", secrets,
1206                         {2: ([], [], 0)},
1207                         [])
1208         self.failUnlessEqual(answer, (True, {2:[]}) )
1209         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1210                              {})
1211         # and the bucket directory should now be gone
1212         si = base32.b2a("si1")
1213         # note: this is a detail of the storage server implementation, and
1214         # may change in the future
1215         prefix = si[:2]
1216         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1217         bucketdir = os.path.join(prefixdir, si)
1218         self.failUnless(os.path.exists(prefixdir))
1219         self.failIf(os.path.exists(bucketdir))
1220
1221 class Stats(unittest.TestCase):
1222
1223     def setUp(self):
1224         self.sparent = LoggingServiceParent()
1225         self._lease_secret = itertools.count()
1226     def tearDown(self):
1227         return self.sparent.stopService()
1228
1229     def workdir(self, name):
1230         basedir = os.path.join("storage", "Server", name)
1231         return basedir
1232
1233     def create(self, name):
1234         workdir = self.workdir(name)
1235         ss = StorageServer(workdir, "\x00" * 20)
1236         ss.setServiceParent(self.sparent)
1237         return ss
1238
1239     def test_latencies(self):
1240         ss = self.create("test_latencies")
1241         for i in range(10000):
1242             ss.add_latency("allocate", 1.0 * i)
1243         for i in range(1000):
1244             ss.add_latency("renew", 1.0 * i)
1245         for i in range(10):
1246             ss.add_latency("cancel", 2.0 * i)
1247         ss.add_latency("get", 5.0)
1248
1249         output = ss.get_latencies()
1250
1251         self.failUnlessEqual(sorted(output.keys()),
1252                              sorted(["allocate", "renew", "cancel", "get"]))
1253         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1254         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1255         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1256         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1257         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1258         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1259         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1260         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1261         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1262
1263         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1264         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1265         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1266         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1267         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1268         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1269         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1270         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1271         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1272
1273         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1274         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1275         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1276         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1277         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1278         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1279         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1280         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1281         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1282
1283         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1284         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1285         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1286         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1287         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1288         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1289         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1290         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1291         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1292
1293 class NoStatvfsServer(StorageServer):
1294     def do_statvfs(self):
1295         raise AttributeError
1296
1297 class WebStatus(unittest.TestCase):
1298
1299     def test_no_server(self):
1300         w = StorageStatus(None)
1301         html = w.renderSynchronously()
1302         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
1303
1304
1305     def remove_tags(self, s):
1306         s = re.sub(r'<[^>]*>', ' ', s)
1307         s = re.sub(r'\s+', ' ', s)
1308         return s
1309
1310     def test_status(self):
1311         basedir = "storage/WebStatus/status"
1312         fileutil.make_dirs(basedir)
1313         ss = StorageServer(basedir, "\x00" * 20)
1314         w = StorageStatus(ss)
1315         html = w.renderSynchronously()
1316         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1317         s = self.remove_tags(html)
1318         self.failUnless("Accepting new shares: Yes" in s, s)
1319         self.failUnless("Reserved space: - 0B" in s, s)
1320
1321     def test_status_no_statvfs(self):
1322         # windows has no os.statvfs . Make sure the code handles that even on
1323         # unix.
1324         basedir = "storage/WebStatus/status_no_statvfs"
1325         fileutil.make_dirs(basedir)
1326         ss = NoStatvfsServer(basedir, "\x00" * 20)
1327         w = StorageStatus(ss)
1328         html = w.renderSynchronously()
1329         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1330         s = self.remove_tags(html)
1331         self.failUnless("Accepting new shares: Yes" in s, s)
1332         self.failUnless("Total disk space: ?" in s, s)
1333
1334     def test_readonly(self):
1335         basedir = "storage/WebStatus/readonly"
1336         fileutil.make_dirs(basedir)
1337         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
1338         w = StorageStatus(ss)
1339         html = w.renderSynchronously()
1340         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1341         s = self.remove_tags(html)
1342         self.failUnless("Accepting new shares: No" in s, s)
1343
1344     def test_reserved(self):
1345         basedir = "storage/WebStatus/reserved"
1346         fileutil.make_dirs(basedir)
1347         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1348         w = StorageStatus(ss)
1349         html = w.renderSynchronously()
1350         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1351         s = self.remove_tags(html)
1352         self.failUnless("Reserved space: - 10.00MB" in s, s)
1353
1354     def test_util(self):
1355         self.failUnlessEqual(abbreviate_if_known(None), "?")
1356         self.failUnlessEqual(abbreviate_if_known(10e6), "10.00MB")
1357         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
1358         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
1359