]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage/mutable: raise a specific error upon seeing bad magic, instead of using assert
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re, simplejson
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
26
27 class Marker:
28     pass
29 class FakeCanary:
30     def __init__(self, ignore_disconnectors=False):
31         self.ignore = ignore_disconnectors
32         self.disconnectors = {}
33     def notifyOnDisconnect(self, f, *args, **kwargs):
34         if self.ignore:
35             return
36         m = Marker()
37         self.disconnectors[m] = (f, args, kwargs)
38         return m
39     def dontNotifyOnDisconnect(self, marker):
40         if self.ignore:
41             return
42         del self.disconnectors[marker]
43
44 class FakeStatsProvider:
45     def count(self, name, delta=1):
46         pass
47     def register_producer(self, producer):
48         pass
49
50 class Bucket(unittest.TestCase):
51     def make_workdir(self, name):
52         basedir = os.path.join("storage", "Bucket", name)
53         incoming = os.path.join(basedir, "tmp", "bucket")
54         final = os.path.join(basedir, "bucket")
55         fileutil.make_dirs(basedir)
56         fileutil.make_dirs(os.path.join(basedir, "tmp"))
57         return incoming, final
58
59     def bucket_writer_closed(self, bw, consumed):
60         pass
61     def add_latency(self, category, latency):
62         pass
63     def count(self, name, delta=1):
64         pass
65
66     def make_lease(self):
67         owner_num = 0
68         renew_secret = os.urandom(32)
69         cancel_secret = os.urandom(32)
70         expiration_time = time.time() + 5000
71         return LeaseInfo(owner_num, renew_secret, cancel_secret,
72                          expiration_time, "\x00" * 20)
73
74     def test_create(self):
75         incoming, final = self.make_workdir("test_create")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*25)
81         bw.remote_write(75, "d"*7)
82         bw.remote_close()
83
84     def test_readwrite(self):
85         incoming, final = self.make_workdir("test_readwrite")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*7) # last block may be short
91         bw.remote_close()
92
93         # now read from it
94         br = BucketReader(self, bw.finalhome)
95         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
98
99 class RemoteBucket:
100
101     def callRemote(self, methname, *args, **kwargs):
102         def _call():
103             meth = getattr(self.target, "remote_" + methname)
104             return meth(*args, **kwargs)
105         return defer.maybeDeferred(_call)
106
107 class BucketProxy(unittest.TestCase):
108     def make_bucket(self, name, size):
109         basedir = os.path.join("storage", "BucketProxy", name)
110         incoming = os.path.join(basedir, "tmp", "bucket")
111         final = os.path.join(basedir, "bucket")
112         fileutil.make_dirs(basedir)
113         fileutil.make_dirs(os.path.join(basedir, "tmp"))
114         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
115                           FakeCanary())
116         rb = RemoteBucket()
117         rb.target = bw
118         return bw, rb, final
119
120     def make_lease(self):
121         owner_num = 0
122         renew_secret = os.urandom(32)
123         cancel_secret = os.urandom(32)
124         expiration_time = time.time() + 5000
125         return LeaseInfo(owner_num, renew_secret, cancel_secret,
126                          expiration_time, "\x00" * 20)
127
128     def bucket_writer_closed(self, bw, consumed):
129         pass
130     def add_latency(self, category, latency):
131         pass
132     def count(self, name, delta=1):
133         pass
134
135     def test_create(self):
136         bw, rb, sharefname = self.make_bucket("test_create", 500)
137         bp = WriteBucketProxy(rb,
138                               data_size=300,
139                               block_size=10,
140                               num_segments=5,
141                               num_share_hashes=3,
142                               uri_extension_size_max=500, nodeid=None)
143         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
144
145     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146         # Let's pretend each share has 100 bytes of data, and that there are
147         # 4 segments (25 bytes each), and 8 shares total. So the two
148         # per-segment merkle trees (crypttext_hash_tree,
149         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152         # long. That should make the whole share:
153         #
154         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
156
157         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
158
159         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
160                             for i in range(7)]
161         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
162                         for i in range(7)]
163         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
164                         for i in (1,9,13)]
165         uri_extension = "s" + "E"*498 + "e"
166
167         bw, rb, sharefname = self.make_bucket(name, sharesize)
168         bp = wbp_class(rb,
169                        data_size=95,
170                        block_size=25,
171                        num_segments=4,
172                        num_share_hashes=3,
173                        uri_extension_size_max=len(uri_extension),
174                        nodeid=None)
175
176         d = bp.put_header()
177         d.addCallback(lambda res: bp.put_block(0, "a"*25))
178         d.addCallback(lambda res: bp.put_block(1, "b"*25))
179         d.addCallback(lambda res: bp.put_block(2, "c"*25))
180         d.addCallback(lambda res: bp.put_block(3, "d"*20))
181         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185         d.addCallback(lambda res: bp.close())
186
187         # now read everything back
188         def _start_reading(res):
189             br = BucketReader(self, sharefname)
190             rb = RemoteBucket()
191             rb.target = br
192             rbp = rbp_class(rb, peerid="abc", storage_index="")
193             self.failUnless("to peer" in repr(rbp))
194             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
195
196             d1 = rbp.get_block_data(0, 25, 25)
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
204
205             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, crypttext_hashes))
208             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210             d1.addCallback(lambda res: rbp.get_share_hashes())
211             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212             d1.addCallback(lambda res: rbp.get_uri_extension())
213             d1.addCallback(lambda res:
214                            self.failUnlessEqual(res, uri_extension))
215
216             return d1
217
218         d.addCallback(_start_reading)
219
220         return d
221
222     def test_readwrite_v1(self):
223         return self._do_test_readwrite("test_readwrite_v1",
224                                        0x24, WriteBucketProxy, ReadBucketProxy)
225
226     def test_readwrite_v2(self):
227         return self._do_test_readwrite("test_readwrite_v2",
228                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
229
230 class FakeDiskStorageServer(StorageServer):
231     def stat_disk(self, d):
232         return self.DISKAVAIL
233
234 class Server(unittest.TestCase):
235
236     def setUp(self):
237         self.sparent = LoggingServiceParent()
238         self.sparent.startService()
239         self._lease_secret = itertools.count()
240     def tearDown(self):
241         return self.sparent.stopService()
242
243     def workdir(self, name):
244         basedir = os.path.join("storage", "Server", name)
245         return basedir
246
247     def create(self, name, reserved_space=0, klass=StorageServer):
248         workdir = self.workdir(name)
249         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
250                    stats_provider=FakeStatsProvider())
251         ss.setServiceParent(self.sparent)
252         return ss
253
254     def test_create(self):
255         ss = self.create("test_create")
256
257     def allocate(self, ss, storage_index, sharenums, size, canary=None):
258         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260         if not canary:
261             canary = FakeCanary()
262         return ss.remote_allocate_buckets(storage_index,
263                                           renew_secret, cancel_secret,
264                                           sharenums, size, canary)
265
266     def test_large_share(self):
267         ss = self.create("test_large_share")
268
269         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
270         self.failUnlessEqual(already, set())
271         self.failUnlessEqual(set(writers.keys()), set([0]))
272
273         shnum, bucket = writers.items()[0]
274         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
275         bucket.remote_write(2**32, "ab")
276         bucket.remote_close()
277
278         readers = ss.remote_get_buckets("allocate")
279         reader = readers[shnum]
280         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
281     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
282
283     def test_dont_overfill_dirs(self):
284         """
285         This test asserts that if you add a second share whose storage index
286         share lots of leading bits with an extant share (but isn't the exact
287         same storage index), this won't add an entry to the share directory.
288         """
289         ss = self.create("test_dont_overfill_dirs")
290         already, writers = self.allocate(ss, "storageindex", [0], 10)
291         for i, wb in writers.items():
292             wb.remote_write(0, "%10d" % i)
293             wb.remote_close()
294         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
295                                 "shares")
296         children_of_storedir = set(os.listdir(storedir))
297
298         # Now store another one under another storageindex that has leading
299         # chars the same as the first storageindex.
300         already, writers = self.allocate(ss, "storageindey", [0], 10)
301         for i, wb in writers.items():
302             wb.remote_write(0, "%10d" % i)
303             wb.remote_close()
304         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
305                                 "shares")
306         new_children_of_storedir = set(os.listdir(storedir))
307         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
308
309     def test_remove_incoming(self):
310         ss = self.create("test_remove_incoming")
311         already, writers = self.allocate(ss, "vid", range(3), 10)
312         for i,wb in writers.items():
313             wb.remote_write(0, "%10d" % i)
314             wb.remote_close()
315         incoming_share_dir = wb.incominghome
316         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
317         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
318         incoming_dir = os.path.dirname(incoming_prefix_dir)
319         self.failIf(os.path.exists(incoming_bucket_dir))
320         self.failIf(os.path.exists(incoming_prefix_dir))
321         self.failUnless(os.path.exists(incoming_dir))
322
323     def test_allocate(self):
324         ss = self.create("test_allocate")
325
326         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
327
328         canary = FakeCanary()
329         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330         self.failUnlessEqual(already, set())
331         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
332
333         # while the buckets are open, they should not count as readable
334         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
335
336         # close the buckets
337         for i,wb in writers.items():
338             wb.remote_write(0, "%25d" % i)
339             wb.remote_close()
340             # aborting a bucket that was already closed is a no-op
341             wb.remote_abort()
342
343         # now they should be readable
344         b = ss.remote_get_buckets("allocate")
345         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
347
348         # now if we ask about writing again, the server should offer those
349         # three buckets as already present. It should offer them even if we
350         # don't ask about those specific ones.
351         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
352         self.failUnlessEqual(already, set([0,1,2]))
353         self.failUnlessEqual(set(writers.keys()), set([3,4]))
354
355         # while those two buckets are open for writing, the server should
356         # refuse to offer them to uploaders
357
358         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
359         self.failUnlessEqual(already2, set([0,1,2]))
360         self.failUnlessEqual(set(writers2.keys()), set([5]))
361
362         # aborting the writes should remove the tempfiles
363         for i,wb in writers2.items():
364             wb.remote_abort()
365         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
366         self.failUnlessEqual(already2, set([0,1,2]))
367         self.failUnlessEqual(set(writers2.keys()), set([5]))
368
369         for i,wb in writers2.items():
370             wb.remote_abort()
371         for i,wb in writers.items():
372             wb.remote_abort()
373
374     def test_disconnect(self):
375         # simulate a disconnection
376         ss = self.create("test_disconnect")
377         canary = FakeCanary()
378         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
379         self.failUnlessEqual(already, set())
380         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
381         for (f,args,kwargs) in canary.disconnectors.values():
382             f(*args, **kwargs)
383         del already
384         del writers
385
386         # that ought to delete the incoming shares
387         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
388         self.failUnlessEqual(already, set())
389         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
390
391     def test_reserved_space(self):
392         ss = self.create("test_reserved_space", reserved_space=10000,
393                          klass=FakeDiskStorageServer)
394         # the FakeDiskStorageServer doesn't do real statvfs() calls
395         ss.DISKAVAIL = 15000
396         # 15k available, 10k reserved, leaves 5k for shares
397
398         # a newly created and filled share incurs this much overhead, beyond
399         # the size we request.
400         OVERHEAD = 3*4
401         LEASE_SIZE = 4+32+32+4
402         canary = FakeCanary(True)
403         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
404         self.failUnlessEqual(len(writers), 3)
405         # now the StorageServer should have 3000 bytes provisionally
406         # allocated, allowing only 2000 more to be claimed
407         self.failUnlessEqual(len(ss._active_writers), 3)
408
409         # allocating 1001-byte shares only leaves room for one
410         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
411         self.failUnlessEqual(len(writers2), 1)
412         self.failUnlessEqual(len(ss._active_writers), 4)
413
414         # we abandon the first set, so their provisional allocation should be
415         # returned
416         del already
417         del writers
418         self.failUnlessEqual(len(ss._active_writers), 1)
419         # now we have a provisional allocation of 1001 bytes
420
421         # and we close the second set, so their provisional allocation should
422         # become real, long-term allocation, and grows to include the
423         # overhead.
424         for bw in writers2.values():
425             bw.remote_write(0, "a"*25)
426             bw.remote_close()
427         del already2
428         del writers2
429         del bw
430         self.failUnlessEqual(len(ss._active_writers), 0)
431
432         allocated = 1001 + OVERHEAD + LEASE_SIZE
433
434         # we have to manually increase DISKAVAIL, since we're not doing real
435         # disk measurements
436         ss.DISKAVAIL -= allocated
437
438         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
439         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
440         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
441         self.failUnlessEqual(len(writers3), 39)
442         self.failUnlessEqual(len(ss._active_writers), 39)
443
444         del already3
445         del writers3
446         self.failUnlessEqual(len(ss._active_writers), 0)
447         ss.disownServiceParent()
448         del ss
449
450     def test_seek(self):
451         basedir = self.workdir("test_seek_behavior")
452         fileutil.make_dirs(basedir)
453         filename = os.path.join(basedir, "testfile")
454         f = open(filename, "wb")
455         f.write("start")
456         f.close()
457         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
458         # files. mode="a" preserves previous contents but does not allow
459         # seeking-to-create-holes. mode="r+" allows both.
460         f = open(filename, "rb+")
461         f.seek(100)
462         f.write("100")
463         f.close()
464         filelen = os.stat(filename)[stat.ST_SIZE]
465         self.failUnlessEqual(filelen, 100+3)
466         f2 = open(filename, "rb")
467         self.failUnlessEqual(f2.read(5), "start")
468
469
470     def test_leases(self):
471         ss = self.create("test_leases")
472         canary = FakeCanary()
473         sharenums = range(5)
474         size = 100
475
476         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
477                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
478         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
479                                                      sharenums, size, canary)
480         self.failUnlessEqual(len(already), 0)
481         self.failUnlessEqual(len(writers), 5)
482         for wb in writers.values():
483             wb.remote_close()
484
485         leases = list(ss.get_leases("si0"))
486         self.failUnlessEqual(len(leases), 1)
487         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
488
489         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
490                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
491         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
492                                                      sharenums, size, canary)
493         for wb in writers.values():
494             wb.remote_close()
495
496         # take out a second lease on si1
497         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
498                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
499         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
500                                                      sharenums, size, canary)
501         self.failUnlessEqual(len(already), 5)
502         self.failUnlessEqual(len(writers), 0)
503
504         leases = list(ss.get_leases("si1"))
505         self.failUnlessEqual(len(leases), 2)
506         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
507
508         # and a third lease, using add-lease
509         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
510                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
511         ss.remote_add_lease("si1", rs2a, cs2a)
512         leases = list(ss.get_leases("si1"))
513         self.failUnlessEqual(len(leases), 3)
514         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
515
516         # add-lease on a missing storage index is silently ignored
517         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
518
519         # check that si0 is readable
520         readers = ss.remote_get_buckets("si0")
521         self.failUnlessEqual(len(readers), 5)
522
523         # renew the first lease. Only the proper renew_secret should work
524         ss.remote_renew_lease("si0", rs0)
525         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
526         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
527
528         # check that si0 is still readable
529         readers = ss.remote_get_buckets("si0")
530         self.failUnlessEqual(len(readers), 5)
531
532         # now cancel it
533         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
534         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
535         ss.remote_cancel_lease("si0", cs0)
536
537         # si0 should now be gone
538         readers = ss.remote_get_buckets("si0")
539         self.failUnlessEqual(len(readers), 0)
540         # and the renew should no longer work
541         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
542
543
544         # cancel the first lease on si1, leaving the second and third in place
545         ss.remote_cancel_lease("si1", cs1)
546         readers = ss.remote_get_buckets("si1")
547         self.failUnlessEqual(len(readers), 5)
548         # the corresponding renew should no longer work
549         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
550
551         leases = list(ss.get_leases("si1"))
552         self.failUnlessEqual(len(leases), 2)
553         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
554
555         ss.remote_renew_lease("si1", rs2)
556         # cancelling the second and third should make it go away
557         ss.remote_cancel_lease("si1", cs2)
558         ss.remote_cancel_lease("si1", cs2a)
559         readers = ss.remote_get_buckets("si1")
560         self.failUnlessEqual(len(readers), 0)
561         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
562         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
563         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
564
565         leases = list(ss.get_leases("si1"))
566         self.failUnlessEqual(len(leases), 0)
567
568
569         # test overlapping uploads
570         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
571                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
572         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
573                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
574         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
575                                                      sharenums, size, canary)
576         self.failUnlessEqual(len(already), 0)
577         self.failUnlessEqual(len(writers), 5)
578         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
579                                                        sharenums, size, canary)
580         self.failUnlessEqual(len(already2), 0)
581         self.failUnlessEqual(len(writers2), 0)
582         for wb in writers.values():
583             wb.remote_close()
584
585         leases = list(ss.get_leases("si3"))
586         self.failUnlessEqual(len(leases), 1)
587
588         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
589                                                        sharenums, size, canary)
590         self.failUnlessEqual(len(already3), 5)
591         self.failUnlessEqual(len(writers3), 0)
592
593         leases = list(ss.get_leases("si3"))
594         self.failUnlessEqual(len(leases), 2)
595
596     def test_readonly(self):
597         workdir = self.workdir("test_readonly")
598         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
599         ss.setServiceParent(self.sparent)
600
601         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
602         self.failUnlessEqual(already, set())
603         self.failUnlessEqual(writers, {})
604
605         stats = ss.get_stats()
606         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
607                              False)
608         if "storage_server.disk_avail" in stats:
609             # windows does not have os.statvfs, so it doesn't give us disk
610             # stats. But if there are stats, readonly_storage means
611             # disk_avail=0
612             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
613
614     def test_discard(self):
615         # discard is really only used for other tests, but we test it anyways
616         workdir = self.workdir("test_discard")
617         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
618         ss.setServiceParent(self.sparent)
619
620         canary = FakeCanary()
621         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
622         self.failUnlessEqual(already, set())
623         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
624         for i,wb in writers.items():
625             wb.remote_write(0, "%25d" % i)
626             wb.remote_close()
627         # since we discard the data, the shares should be present but sparse.
628         # Since we write with some seeks, the data we read back will be all
629         # zeros.
630         b = ss.remote_get_buckets("vid")
631         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
632         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
633
634     def test_advise_corruption(self):
635         workdir = self.workdir("test_advise_corruption")
636         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
637         ss.setServiceParent(self.sparent)
638
639         si0_s = base32.b2a("si0")
640         ss.remote_advise_corrupt_share("immutable", "si0", 0,
641                                        "This share smells funny.\n")
642         reportdir = os.path.join(workdir, "corruption-advisories")
643         reports = os.listdir(reportdir)
644         self.failUnlessEqual(len(reports), 1)
645         report_si0 = reports[0]
646         self.failUnless(si0_s in report_si0, report_si0)
647         f = open(os.path.join(reportdir, report_si0), "r")
648         report = f.read()
649         f.close()
650         self.failUnless("type: immutable" in report)
651         self.failUnless(("storage_index: %s" % si0_s) in report)
652         self.failUnless("share_number: 0" in report)
653         self.failUnless("This share smells funny." in report)
654
655         # test the RIBucketWriter version too
656         si1_s = base32.b2a("si1")
657         already,writers = self.allocate(ss, "si1", [1], 75)
658         self.failUnlessEqual(already, set())
659         self.failUnlessEqual(set(writers.keys()), set([1]))
660         writers[1].remote_write(0, "data")
661         writers[1].remote_close()
662
663         b = ss.remote_get_buckets("si1")
664         self.failUnlessEqual(set(b.keys()), set([1]))
665         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
666
667         reports = os.listdir(reportdir)
668         self.failUnlessEqual(len(reports), 2)
669         report_si1 = [r for r in reports if si1_s in r][0]
670         f = open(os.path.join(reportdir, report_si1), "r")
671         report = f.read()
672         f.close()
673         self.failUnless("type: immutable" in report)
674         self.failUnless(("storage_index: %s" % si1_s) in report)
675         self.failUnless("share_number: 1" in report)
676         self.failUnless("This share tastes like dust." in report)
677
678
679
680 class MutableServer(unittest.TestCase):
681
682     def setUp(self):
683         self.sparent = LoggingServiceParent()
684         self._lease_secret = itertools.count()
685     def tearDown(self):
686         return self.sparent.stopService()
687
688     def workdir(self, name):
689         basedir = os.path.join("storage", "MutableServer", name)
690         return basedir
691
692     def create(self, name):
693         workdir = self.workdir(name)
694         ss = StorageServer(workdir, "\x00" * 20)
695         ss.setServiceParent(self.sparent)
696         return ss
697
698     def test_create(self):
699         ss = self.create("test_create")
700
701     def write_enabler(self, we_tag):
702         return hashutil.tagged_hash("we_blah", we_tag)
703
704     def renew_secret(self, tag):
705         return hashutil.tagged_hash("renew_blah", str(tag))
706
707     def cancel_secret(self, tag):
708         return hashutil.tagged_hash("cancel_blah", str(tag))
709
710     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
711         write_enabler = self.write_enabler(we_tag)
712         renew_secret = self.renew_secret(lease_tag)
713         cancel_secret = self.cancel_secret(lease_tag)
714         rstaraw = ss.remote_slot_testv_and_readv_and_writev
715         testandwritev = dict( [ (shnum, ([], [], None) )
716                          for shnum in sharenums ] )
717         readv = []
718         rc = rstaraw(storage_index,
719                      (write_enabler, renew_secret, cancel_secret),
720                      testandwritev,
721                      readv)
722         (did_write, readv_data) = rc
723         self.failUnless(did_write)
724         self.failUnless(isinstance(readv_data, dict))
725         self.failUnlessEqual(len(readv_data), 0)
726
727     def test_bad_magic(self):
728         ss = self.create("test_bad_magic")
729         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
730         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
731         f = open(fn, "rb+")
732         f.seek(0)
733         f.write("BAD MAGIC")
734         f.close()
735         read = ss.remote_slot_readv
736         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
737                                   read, "si1", [0], [(0,10)])
738         self.failUnless(" had magic " in str(e), e)
739         self.failUnless(" but we wanted " in str(e), e)
740
741     def test_container_size(self):
742         ss = self.create("test_container_size")
743         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
744                       set([0,1,2]), 100)
745         read = ss.remote_slot_readv
746         rstaraw = ss.remote_slot_testv_and_readv_and_writev
747         secrets = ( self.write_enabler("we1"),
748                     self.renew_secret("we1"),
749                     self.cancel_secret("we1") )
750         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
751         answer = rstaraw("si1", secrets,
752                          {0: ([], [(0,data)], len(data)+12)},
753                          [])
754         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
755
756         # trying to make the container too large will raise an exception
757         TOOBIG = MutableShareFile.MAX_SIZE + 10
758         self.failUnlessRaises(DataTooLargeError,
759                               rstaraw, "si1", secrets,
760                               {0: ([], [(0,data)], TOOBIG)},
761                               [])
762
763         # it should be possible to make the container smaller, although at
764         # the moment this doesn't actually affect the share, unless the
765         # container size is dropped to zero, in which case the share is
766         # deleted.
767         answer = rstaraw("si1", secrets,
768                          {0: ([], [(0,data)], len(data)+8)},
769                          [])
770         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
771
772         answer = rstaraw("si1", secrets,
773                          {0: ([], [(0,data)], 0)},
774                          [])
775         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
776
777         read_answer = read("si1", [0], [(0,10)])
778         self.failUnlessEqual(read_answer, {})
779
780     def test_allocate(self):
781         ss = self.create("test_allocate")
782         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
783                       set([0,1,2]), 100)
784
785         read = ss.remote_slot_readv
786         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
787                              {0: [""]})
788         self.failUnlessEqual(read("si1", [], [(0, 10)]),
789                              {0: [""], 1: [""], 2: [""]})
790         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
791                              {0: [""]})
792
793         # try writing to one
794         secrets = ( self.write_enabler("we1"),
795                     self.renew_secret("we1"),
796                     self.cancel_secret("we1") )
797         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
798         write = ss.remote_slot_testv_and_readv_and_writev
799         answer = write("si1", secrets,
800                        {0: ([], [(0,data)], None)},
801                        [])
802         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
803
804         self.failUnlessEqual(read("si1", [0], [(0,20)]),
805                              {0: ["00000000001111111111"]})
806         self.failUnlessEqual(read("si1", [0], [(95,10)]),
807                              {0: ["99999"]})
808         #self.failUnlessEqual(s0.remote_get_length(), 100)
809
810         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
811         f = self.failUnlessRaises(BadWriteEnablerError,
812                                   write, "si1", bad_secrets,
813                                   {}, [])
814         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
815
816         # this testv should fail
817         answer = write("si1", secrets,
818                        {0: ([(0, 12, "eq", "444444444444"),
819                              (20, 5, "eq", "22222"),
820                              ],
821                             [(0, "x"*100)],
822                             None),
823                         },
824                        [(0,12), (20,5)],
825                        )
826         self.failUnlessEqual(answer, (False,
827                                       {0: ["000000000011", "22222"],
828                                        1: ["", ""],
829                                        2: ["", ""],
830                                        }))
831         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
832
833         # as should this one
834         answer = write("si1", secrets,
835                        {0: ([(10, 5, "lt", "11111"),
836                              ],
837                             [(0, "x"*100)],
838                             None),
839                         },
840                        [(10,5)],
841                        )
842         self.failUnlessEqual(answer, (False,
843                                       {0: ["11111"],
844                                        1: [""],
845                                        2: [""]},
846                                       ))
847         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
848
849
850     def test_operators(self):
851         # test operators, the data we're comparing is '11111' in all cases.
852         # test both fail+pass, reset data after each one.
853         ss = self.create("test_operators")
854
855         secrets = ( self.write_enabler("we1"),
856                     self.renew_secret("we1"),
857                     self.cancel_secret("we1") )
858         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
859         write = ss.remote_slot_testv_and_readv_and_writev
860         read = ss.remote_slot_readv
861
862         def reset():
863             write("si1", secrets,
864                   {0: ([], [(0,data)], None)},
865                   [])
866
867         reset()
868
869         #  lt
870         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
871                                              ],
872                                             [(0, "x"*100)],
873                                             None,
874                                             )}, [(10,5)])
875         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
876         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
877         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
878         reset()
879
880         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
881                                              ],
882                                             [(0, "x"*100)],
883                                             None,
884                                             )}, [(10,5)])
885         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
886         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
887         reset()
888
889         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
890                                              ],
891                                             [(0, "y"*100)],
892                                             None,
893                                             )}, [(10,5)])
894         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
895         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
896         reset()
897
898         #  le
899         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
900                                              ],
901                                             [(0, "x"*100)],
902                                             None,
903                                             )}, [(10,5)])
904         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
905         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
906         reset()
907
908         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
909                                              ],
910                                             [(0, "y"*100)],
911                                             None,
912                                             )}, [(10,5)])
913         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
914         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
915         reset()
916
917         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
918                                              ],
919                                             [(0, "y"*100)],
920                                             None,
921                                             )}, [(10,5)])
922         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
923         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
924         reset()
925
926         #  eq
927         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
928                                              ],
929                                             [(0, "x"*100)],
930                                             None,
931                                             )}, [(10,5)])
932         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
933         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
934         reset()
935
936         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
937                                              ],
938                                             [(0, "y"*100)],
939                                             None,
940                                             )}, [(10,5)])
941         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
942         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
943         reset()
944
945         #  ne
946         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
947                                              ],
948                                             [(0, "x"*100)],
949                                             None,
950                                             )}, [(10,5)])
951         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
952         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
953         reset()
954
955         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
956                                              ],
957                                             [(0, "y"*100)],
958                                             None,
959                                             )}, [(10,5)])
960         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
961         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
962         reset()
963
964         #  ge
965         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
966                                              ],
967                                             [(0, "y"*100)],
968                                             None,
969                                             )}, [(10,5)])
970         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
971         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
972         reset()
973
974         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
975                                              ],
976                                             [(0, "y"*100)],
977                                             None,
978                                             )}, [(10,5)])
979         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
980         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
981         reset()
982
983         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
984                                              ],
985                                             [(0, "y"*100)],
986                                             None,
987                                             )}, [(10,5)])
988         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
989         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
990         reset()
991
992         #  gt
993         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
994                                              ],
995                                             [(0, "y"*100)],
996                                             None,
997                                             )}, [(10,5)])
998         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
999         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1000         reset()
1001
1002         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1003                                              ],
1004                                             [(0, "x"*100)],
1005                                             None,
1006                                             )}, [(10,5)])
1007         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1008         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1009         reset()
1010
1011         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1012                                              ],
1013                                             [(0, "x"*100)],
1014                                             None,
1015                                             )}, [(10,5)])
1016         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1017         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1018         reset()
1019
1020         # finally, test some operators against empty shares
1021         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1022                                              ],
1023                                             [(0, "x"*100)],
1024                                             None,
1025                                             )}, [(10,5)])
1026         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1027         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1028         reset()
1029
1030     def test_readv(self):
1031         ss = self.create("test_readv")
1032         secrets = ( self.write_enabler("we1"),
1033                     self.renew_secret("we1"),
1034                     self.cancel_secret("we1") )
1035         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1036         write = ss.remote_slot_testv_and_readv_and_writev
1037         read = ss.remote_slot_readv
1038         data = [("%d" % i) * 100 for i in range(3)]
1039         rc = write("si1", secrets,
1040                    {0: ([], [(0,data[0])], None),
1041                     1: ([], [(0,data[1])], None),
1042                     2: ([], [(0,data[2])], None),
1043                     }, [])
1044         self.failUnlessEqual(rc, (True, {}))
1045
1046         answer = read("si1", [], [(0, 10)])
1047         self.failUnlessEqual(answer, {0: ["0"*10],
1048                                       1: ["1"*10],
1049                                       2: ["2"*10]})
1050
1051     def compare_leases_without_timestamps(self, leases_a, leases_b):
1052         self.failUnlessEqual(len(leases_a), len(leases_b))
1053         for i in range(len(leases_a)):
1054             a = leases_a[i]
1055             b = leases_b[i]
1056             self.failUnlessEqual(a.owner_num,       b.owner_num)
1057             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1058             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1059             self.failUnlessEqual(a.nodeid,          b.nodeid)
1060
1061     def compare_leases(self, leases_a, leases_b):
1062         self.failUnlessEqual(len(leases_a), len(leases_b))
1063         for i in range(len(leases_a)):
1064             a = leases_a[i]
1065             b = leases_b[i]
1066             self.failUnlessEqual(a.owner_num,       b.owner_num)
1067             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1068             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1069             self.failUnlessEqual(a.nodeid,          b.nodeid)
1070             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1071
1072     def test_leases(self):
1073         ss = self.create("test_leases")
1074         def secrets(n):
1075             return ( self.write_enabler("we1"),
1076                      self.renew_secret("we1-%d" % n),
1077                      self.cancel_secret("we1-%d" % n) )
1078         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1079         write = ss.remote_slot_testv_and_readv_and_writev
1080         read = ss.remote_slot_readv
1081         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1082         self.failUnlessEqual(rc, (True, {}))
1083
1084         # create a random non-numeric file in the bucket directory, to
1085         # exercise the code that's supposed to ignore those.
1086         bucket_dir = os.path.join(self.workdir("test_leases"),
1087                                   "shares", storage_index_to_dir("si1"))
1088         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1089         f.write("you ought to be ignoring me\n")
1090         f.close()
1091
1092         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1093         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1094
1095         # add-lease on a missing storage index is silently ignored
1096         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1097
1098         # re-allocate the slots and use the same secrets, that should update
1099         # the lease
1100         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1101         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1102
1103         # renew it directly
1104         ss.remote_renew_lease("si1", secrets(0)[1])
1105         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1106
1107         # now allocate them with a bunch of different secrets, to trigger the
1108         # extended lease code. Use add_lease for one of them.
1109         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1110         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1111         secrets2 = secrets(2)
1112         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1113         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1114         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1115         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1116         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1117
1118         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1119
1120         # cancel one of them
1121         ss.remote_cancel_lease("si1", secrets(5)[2])
1122         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1123
1124         all_leases = list(s0.get_leases())
1125         # and write enough data to expand the container, forcing the server
1126         # to move the leases
1127         write("si1", secrets(0),
1128               {0: ([], [(0,data)], 200), },
1129               [])
1130
1131         # read back the leases, make sure they're still intact.
1132         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1133
1134         ss.remote_renew_lease("si1", secrets(0)[1])
1135         ss.remote_renew_lease("si1", secrets(1)[1])
1136         ss.remote_renew_lease("si1", secrets(2)[1])
1137         ss.remote_renew_lease("si1", secrets(3)[1])
1138         ss.remote_renew_lease("si1", secrets(4)[1])
1139         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1140         # get a new copy of the leases, with the current timestamps. Reading
1141         # data and failing to renew/cancel leases should leave the timestamps
1142         # alone.
1143         all_leases = list(s0.get_leases())
1144         # renewing with a bogus token should prompt an error message
1145
1146         # examine the exception thus raised, make sure the old nodeid is
1147         # present, to provide for share migration
1148         e = self.failUnlessRaises(IndexError,
1149                                   ss.remote_renew_lease, "si1",
1150                                   secrets(20)[1])
1151         e_s = str(e)
1152         self.failUnless("Unable to renew non-existent lease" in e_s)
1153         self.failUnless("I have leases accepted by nodeids:" in e_s)
1154         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1155
1156         # same for cancelling
1157         self.failUnlessRaises(IndexError,
1158                               ss.remote_cancel_lease, "si1",
1159                               secrets(20)[2])
1160         self.compare_leases(all_leases, list(s0.get_leases()))
1161
1162         # reading shares should not modify the timestamp
1163         read("si1", [], [(0,200)])
1164         self.compare_leases(all_leases, list(s0.get_leases()))
1165
1166         write("si1", secrets(0),
1167               {0: ([], [(200, "make me bigger")], None)}, [])
1168         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1169
1170         write("si1", secrets(0),
1171               {0: ([], [(500, "make me really bigger")], None)}, [])
1172         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1173
1174         # now cancel them all
1175         ss.remote_cancel_lease("si1", secrets(0)[2])
1176         ss.remote_cancel_lease("si1", secrets(1)[2])
1177         ss.remote_cancel_lease("si1", secrets(2)[2])
1178         ss.remote_cancel_lease("si1", secrets(3)[2])
1179
1180         # the slot should still be there
1181         remaining_shares = read("si1", [], [(0,10)])
1182         self.failUnlessEqual(len(remaining_shares), 1)
1183         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1184
1185         # cancelling a non-existent lease should raise an IndexError
1186         self.failUnlessRaises(IndexError,
1187                               ss.remote_cancel_lease, "si1", "nonsecret")
1188
1189         # and the slot should still be there
1190         remaining_shares = read("si1", [], [(0,10)])
1191         self.failUnlessEqual(len(remaining_shares), 1)
1192         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1193
1194         ss.remote_cancel_lease("si1", secrets(4)[2])
1195         # now the slot should be gone
1196         no_shares = read("si1", [], [(0,10)])
1197         self.failUnlessEqual(no_shares, {})
1198
1199         # cancelling a lease on a non-existent share should raise an IndexError
1200         self.failUnlessRaises(IndexError,
1201                               ss.remote_cancel_lease, "si2", "nonsecret")
1202
1203     def test_remove(self):
1204         ss = self.create("test_remove")
1205         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1206                       set([0,1,2]), 100)
1207         readv = ss.remote_slot_readv
1208         writev = ss.remote_slot_testv_and_readv_and_writev
1209         secrets = ( self.write_enabler("we1"),
1210                     self.renew_secret("we1"),
1211                     self.cancel_secret("we1") )
1212         # delete sh0 by setting its size to zero
1213         answer = writev("si1", secrets,
1214                         {0: ([], [], 0)},
1215                         [])
1216         # the answer should mention all the shares that existed before the
1217         # write
1218         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1219         # but a new read should show only sh1 and sh2
1220         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1221                              {1: [""], 2: [""]})
1222
1223         # delete sh1 by setting its size to zero
1224         answer = writev("si1", secrets,
1225                         {1: ([], [], 0)},
1226                         [])
1227         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1228         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1229                              {2: [""]})
1230
1231         # delete sh2 by setting its size to zero
1232         answer = writev("si1", secrets,
1233                         {2: ([], [], 0)},
1234                         [])
1235         self.failUnlessEqual(answer, (True, {2:[]}) )
1236         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1237                              {})
1238         # and the bucket directory should now be gone
1239         si = base32.b2a("si1")
1240         # note: this is a detail of the storage server implementation, and
1241         # may change in the future
1242         prefix = si[:2]
1243         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1244         bucketdir = os.path.join(prefixdir, si)
1245         self.failUnless(os.path.exists(prefixdir))
1246         self.failIf(os.path.exists(bucketdir))
1247
1248 class Stats(unittest.TestCase):
1249
1250     def setUp(self):
1251         self.sparent = LoggingServiceParent()
1252         self._lease_secret = itertools.count()
1253     def tearDown(self):
1254         return self.sparent.stopService()
1255
1256     def workdir(self, name):
1257         basedir = os.path.join("storage", "Server", name)
1258         return basedir
1259
1260     def create(self, name):
1261         workdir = self.workdir(name)
1262         ss = StorageServer(workdir, "\x00" * 20)
1263         ss.setServiceParent(self.sparent)
1264         return ss
1265
1266     def test_latencies(self):
1267         ss = self.create("test_latencies")
1268         for i in range(10000):
1269             ss.add_latency("allocate", 1.0 * i)
1270         for i in range(1000):
1271             ss.add_latency("renew", 1.0 * i)
1272         for i in range(10):
1273             ss.add_latency("cancel", 2.0 * i)
1274         ss.add_latency("get", 5.0)
1275
1276         output = ss.get_latencies()
1277
1278         self.failUnlessEqual(sorted(output.keys()),
1279                              sorted(["allocate", "renew", "cancel", "get"]))
1280         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1281         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1282         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1283         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1284         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1285         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1286         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1287         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1288         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1289
1290         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1291         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1292         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1293         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1294         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1295         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1296         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1297         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1298         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1299
1300         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1301         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1302         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1303         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1304         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1305         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1306         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1307         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1308         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1309
1310         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1311         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1312         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1313         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1314         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1315         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1316         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1317         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1318         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1319
1320 def remove_tags(s):
1321     s = re.sub(r'<[^>]*>', ' ', s)
1322     s = re.sub(r'\s+', ' ', s)
1323     return s
1324
1325 class MyBucketCountingCrawler(BucketCountingCrawler):
1326     def finished_prefix(self, cycle, prefix):
1327         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1328         if self.hook_ds:
1329             d = self.hook_ds.pop(0)
1330             d.callback(None)
1331
1332 class MyStorageServer(StorageServer):
1333     def add_bucket_counter(self):
1334         statefile = os.path.join(self.storedir, "bucket_counter.state")
1335         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1336         self.bucket_counter.setServiceParent(self)
1337
1338 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1339
1340     def setUp(self):
1341         self.s = service.MultiService()
1342         self.s.startService()
1343     def tearDown(self):
1344         return self.s.stopService()
1345
1346     def test_bucket_counter(self):
1347         basedir = "storage/BucketCounter/bucket_counter"
1348         fileutil.make_dirs(basedir)
1349         ss = StorageServer(basedir, "\x00" * 20)
1350         # to make sure we capture the bucket-counting-crawler in the middle
1351         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1352         # also make it start sooner than usual.
1353         ss.bucket_counter.slow_start = 0
1354         orig_cpu_slice = ss.bucket_counter.cpu_slice
1355         ss.bucket_counter.cpu_slice = 0
1356         ss.setServiceParent(self.s)
1357
1358         w = StorageStatus(ss)
1359
1360         # this sample is before the crawler has started doing anything
1361         html = w.renderSynchronously()
1362         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1363         s = remove_tags(html)
1364         self.failUnless("Accepting new shares: Yes" in s, s)
1365         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1366         self.failUnless("Total buckets: Not computed yet" in s, s)
1367         self.failUnless("Next crawl in" in s, s)
1368
1369         # give the bucket-counting-crawler one tick to get started. The
1370         # cpu_slice=0 will force it to yield right after it processes the
1371         # first prefix
1372
1373         d = eventual.fireEventually()
1374         def _check(ignored):
1375             # are we really right after the first prefix?
1376             state = ss.bucket_counter.get_state()
1377             self.failUnlessEqual(state["last-complete-prefix"],
1378                                  ss.bucket_counter.prefixes[0])
1379             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1380             html = w.renderSynchronously()
1381             s = remove_tags(html)
1382             self.failUnless(" Current crawl " in s, s)
1383             self.failUnless(" (next work in " in s, s)
1384         d.addCallback(_check)
1385
1386         # now give it enough time to complete a full cycle
1387         def _watch():
1388             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1389         d.addCallback(lambda ignored: self.poll(_watch))
1390         def _check2(ignored):
1391             ss.bucket_counter.cpu_slice = orig_cpu_slice
1392             html = w.renderSynchronously()
1393             s = remove_tags(html)
1394             self.failUnless("Total buckets: 0 (the number of" in s, s)
1395             self.failUnless("Next crawl in 59 minutes" in s, s)
1396         d.addCallback(_check2)
1397         return d
1398
1399     def test_bucket_counter_cleanup(self):
1400         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1401         fileutil.make_dirs(basedir)
1402         ss = StorageServer(basedir, "\x00" * 20)
1403         # to make sure we capture the bucket-counting-crawler in the middle
1404         # of a cycle, we reach in and reduce its maximum slice time to 0.
1405         ss.bucket_counter.slow_start = 0
1406         orig_cpu_slice = ss.bucket_counter.cpu_slice
1407         ss.bucket_counter.cpu_slice = 0
1408         ss.setServiceParent(self.s)
1409
1410         d = eventual.fireEventually()
1411
1412         def _after_first_prefix(ignored):
1413             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1414             # now sneak in and mess with its state, to make sure it cleans up
1415             # properly at the end of the cycle
1416             state = ss.bucket_counter.state
1417             self.failUnlessEqual(state["last-complete-prefix"],
1418                                  ss.bucket_counter.prefixes[0])
1419             state["bucket-counts"][-12] = {}
1420             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1421             ss.bucket_counter.save_state()
1422         d.addCallback(_after_first_prefix)
1423
1424         # now give it enough time to complete a cycle
1425         def _watch():
1426             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1427         d.addCallback(lambda ignored: self.poll(_watch))
1428         def _check2(ignored):
1429             ss.bucket_counter.cpu_slice = orig_cpu_slice
1430             s = ss.bucket_counter.get_state()
1431             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1432             self.failIf("bogusprefix!" in s["storage-index-samples"],
1433                         s["storage-index-samples"].keys())
1434         d.addCallback(_check2)
1435         return d
1436
1437     def test_bucket_counter_eta(self):
1438         basedir = "storage/BucketCounter/bucket_counter_eta"
1439         fileutil.make_dirs(basedir)
1440         ss = MyStorageServer(basedir, "\x00" * 20)
1441         ss.bucket_counter.slow_start = 0
1442         # these will be fired inside finished_prefix()
1443         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1444         w = StorageStatus(ss)
1445
1446         d = defer.Deferred()
1447
1448         def _check_1(ignored):
1449             # no ETA is available yet
1450             html = w.renderSynchronously()
1451             s = remove_tags(html)
1452             self.failUnlessIn("complete (next work", s)
1453
1454         def _check_2(ignored):
1455             # one prefix has finished, so an ETA based upon that elapsed time
1456             # should be available.
1457             html = w.renderSynchronously()
1458             s = remove_tags(html)
1459             self.failUnlessIn("complete (ETA ", s)
1460
1461         def _check_3(ignored):
1462             # two prefixes have finished
1463             html = w.renderSynchronously()
1464             s = remove_tags(html)
1465             self.failUnlessIn("complete (ETA ", s)
1466             d.callback("done")
1467
1468         hooks[0].addCallback(_check_1).addErrback(d.errback)
1469         hooks[1].addCallback(_check_2).addErrback(d.errback)
1470         hooks[2].addCallback(_check_3).addErrback(d.errback)
1471
1472         ss.setServiceParent(self.s)
1473         return d
1474
1475 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1476     stop_after_first_bucket = False
1477     def process_bucket(self, *args, **kwargs):
1478         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1479         if self.stop_after_first_bucket:
1480             self.stop_after_first_bucket = False
1481             self.cpu_slice = -1.0
1482     def yielding(self, sleep_time):
1483         if not self.stop_after_first_bucket:
1484             self.cpu_slice = 500
1485
1486 class BrokenStatResults:
1487     pass
1488 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1489     def stat(self, fn):
1490         s = os.stat(fn)
1491         bsr = BrokenStatResults()
1492         for attrname in dir(s):
1493             if attrname.startswith("_"):
1494                 continue
1495             if attrname == "st_blocks":
1496                 continue
1497             setattr(bsr, attrname, getattr(s, attrname))
1498         return bsr
1499
1500 class InstrumentedStorageServer(StorageServer):
1501     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1502 class No_ST_BLOCKS_StorageServer(StorageServer):
1503     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1504
1505 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1506
1507     def setUp(self):
1508         self.s = service.MultiService()
1509         self.s.startService()
1510     def tearDown(self):
1511         return self.s.stopService()
1512
1513     def make_shares(self, ss):
1514         def make(si):
1515             return (si, hashutil.tagged_hash("renew", si),
1516                     hashutil.tagged_hash("cancel", si))
1517         def make_mutable(si):
1518             return (si, hashutil.tagged_hash("renew", si),
1519                     hashutil.tagged_hash("cancel", si),
1520                     hashutil.tagged_hash("write-enabler", si))
1521         def make_extra_lease(si, num):
1522             return (hashutil.tagged_hash("renew-%d" % num, si),
1523                     hashutil.tagged_hash("cancel-%d" % num, si))
1524
1525         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1526         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1527         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1528         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1529         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1530         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1531         sharenums = [0]
1532         canary = FakeCanary()
1533         # note: 'tahoe debug dump-share' will not handle this file, since the
1534         # inner contents are not a valid CHK share
1535         data = "\xff" * 1000
1536
1537         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1538                                          1000, canary)
1539         w[0].remote_write(0, data)
1540         w[0].remote_close()
1541
1542         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1543                                          1000, canary)
1544         w[0].remote_write(0, data)
1545         w[0].remote_close()
1546         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1547
1548         writev = ss.remote_slot_testv_and_readv_and_writev
1549         writev(mutable_si_2, (we2, rs2, cs2),
1550                {0: ([], [(0,data)], len(data))}, [])
1551         writev(mutable_si_3, (we3, rs3, cs3),
1552                {0: ([], [(0,data)], len(data))}, [])
1553         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1554
1555         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1556         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1557         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1558
1559     def test_basic(self):
1560         basedir = "storage/LeaseCrawler/basic"
1561         fileutil.make_dirs(basedir)
1562         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1563         # make it start sooner than usual.
1564         lc = ss.lease_checker
1565         lc.slow_start = 0
1566         lc.cpu_slice = 500
1567         lc.stop_after_first_bucket = True
1568         webstatus = StorageStatus(ss)
1569
1570         # create a few shares, with some leases on them
1571         self.make_shares(ss)
1572         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1573
1574         # add a non-sharefile to exercise another code path
1575         fn = os.path.join(ss.sharedir,
1576                           storage_index_to_dir(immutable_si_0),
1577                           "not-a-share")
1578         f = open(fn, "wb")
1579         f.write("I am not a share.\n")
1580         f.close()
1581
1582         # this is before the crawl has started, so we're not in a cycle yet
1583         initial_state = lc.get_state()
1584         self.failIf(lc.get_progress()["cycle-in-progress"])
1585         self.failIf("cycle-to-date" in initial_state)
1586         self.failIf("estimated-remaining-cycle" in initial_state)
1587         self.failIf("estimated-current-cycle" in initial_state)
1588         self.failUnless("history" in initial_state)
1589         self.failUnlessEqual(initial_state["history"], {})
1590
1591         ss.setServiceParent(self.s)
1592
1593         d = eventual.fireEventually()
1594
1595         # now examine the state right after the first bucket has been
1596         # processed.
1597         def _after_first_bucket(ignored):
1598             initial_state = lc.get_state()
1599             self.failUnless("cycle-to-date" in initial_state)
1600             self.failUnless("estimated-remaining-cycle" in initial_state)
1601             self.failUnless("estimated-current-cycle" in initial_state)
1602             self.failUnless("history" in initial_state)
1603             self.failUnlessEqual(initial_state["history"], {})
1604
1605             so_far = initial_state["cycle-to-date"]
1606             self.failUnlessEqual(so_far["expiration-enabled"], False)
1607             self.failUnless("configured-expiration-time" in so_far)
1608             self.failUnless("lease-age-histogram" in so_far)
1609             lah = so_far["lease-age-histogram"]
1610             self.failUnlessEqual(type(lah), list)
1611             self.failUnlessEqual(len(lah), 1)
1612             self.failUnlessEqual(lah, [ (0.0, lc.age_limit/10.0, 1) ] )
1613             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1614             self.failUnlessEqual(so_far["buckets-examined"], 1)
1615             self.failUnlessEqual(so_far["shares-examined"], 1)
1616             sr1 = so_far["space-recovered"]
1617             self.failUnlessEqual(sr1["actual-numshares"], 0)
1618             self.failUnlessEqual(sr1["configured-leasetimer-diskbytes"], 0)
1619             self.failUnlessEqual(sr1["original-leasetimer-sharebytes"], 0)
1620             left = initial_state["estimated-remaining-cycle"]
1621             self.failUnless(left["buckets-examined"] > 0,
1622                             left["buckets-examined"])
1623             self.failUnless(left["shares-examined"] > 0,
1624                             left["shares-examined"])
1625             sr2 = left["space-recovered"]
1626             self.failIfEqual(sr2["actual-numshares"], None)
1627             self.failIfEqual(sr2["configured-leasetimer-diskbytes"], None)
1628             self.failIfEqual(sr2["original-leasetimer-sharebytes"], None)
1629         d.addCallback(_after_first_bucket)
1630         d.addCallback(lambda ign: self.render1(webstatus))
1631         def _check_html_in_cycle(html):
1632             s = remove_tags(html)
1633             self.failUnlessIn("So far, this cycle has examined "
1634                               "1 shares in 1 buckets "
1635                               "and has recovered: "
1636                               "0 shares, 0 buckets, 0 B ", s)
1637             self.failUnlessIn("If expiration were enabled, "
1638                               "we would have recovered: "
1639                               "0 shares, 0 buckets, 0 B by now", s)
1640             self.failUnlessIn("and the remainder of this cycle "
1641                               "would probably recover: "
1642                               "0 shares, 0 buckets, 0 B ", s)
1643             self.failUnlessIn("and the whole cycle would probably recover: "
1644                               "0 shares, 0 buckets, 0 B ", s)
1645             self.failUnlessIn("if we were using each lease's default "
1646                               "31-day lease lifetime", s)
1647             self.failUnlessIn("this cycle would be expected to recover: ", s)
1648         d.addCallback(_check_html_in_cycle)
1649
1650         # wait for the crawler to finish the first cycle. Nothing should have
1651         # been removed.
1652         def _wait():
1653             return bool(lc.get_state()["last-cycle-finished"] is not None)
1654         d.addCallback(lambda ign: self.poll(_wait))
1655
1656         def _after_first_cycle(ignored):
1657             s = lc.get_state()
1658             self.failIf("cycle-to-date" in s)
1659             self.failIf("estimated-remaining-cycle" in s)
1660             self.failIf("estimated-current-cycle" in s)
1661             last = s["history"][0]
1662             self.failUnless("cycle-start-finish-times" in last)
1663             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1664             self.failUnlessEqual(last["expiration-enabled"], False)
1665             self.failUnless("configured-expiration-time" in last)
1666
1667             self.failUnless("lease-age-histogram" in last)
1668             lah = last["lease-age-histogram"]
1669             self.failUnlessEqual(type(lah), list)
1670             self.failUnlessEqual(len(lah), 1)
1671             self.failUnlessEqual(lah, [ (0.0, lc.age_limit/10.0, 6) ] )
1672
1673             self.failUnlessEqual(last["leases-per-share-histogram"],
1674                                  {1: 2, 2: 2})
1675             self.failUnlessEqual(last["buckets-examined"], 4)
1676             self.failUnlessEqual(last["shares-examined"], 4)
1677
1678             rec = last["space-recovered"]
1679             self.failUnlessEqual(rec["actual-numbuckets"], 0)
1680             self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 0)
1681             self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 0)
1682             self.failUnlessEqual(rec["actual-numshares"], 0)
1683             self.failUnlessEqual(rec["original-leasetimer-numshares"], 0)
1684             self.failUnlessEqual(rec["configured-leasetimer-numshares"], 0)
1685             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1686             self.failUnlessEqual(rec["original-leasetimer-diskbytes"], 0)
1687             self.failUnlessEqual(rec["configured-leasetimer-diskbytes"], 0)
1688             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1689             self.failUnlessEqual(rec["original-leasetimer-sharebytes"], 0)
1690             self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], 0)
1691
1692             def _get_sharefile(si):
1693                 return list(ss._iter_share_files(si))[0]
1694             def count_leases(si):
1695                 return len(list(_get_sharefile(si).get_leases()))
1696             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1697             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1698             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1699             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1700         d.addCallback(_after_first_cycle)
1701         d.addCallback(lambda ign: self.render1(webstatus))
1702         def _check_html(html):
1703             s = remove_tags(html)
1704             self.failUnlessIn("recovered: 0 shares, 0 buckets, 0 B "
1705                               "but expiration was not enabled", s)
1706         d.addCallback(_check_html)
1707         return d
1708
1709     def backdate_lease(self, sf, renew_secret, new_expire_time):
1710         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1711         # "renew" a lease with a new_expire_time that is older than what the
1712         # current lease has), so we have to reach inside it.
1713         for i,lease in enumerate(sf.get_leases()):
1714             if lease.renew_secret == renew_secret:
1715                 lease.expiration_time = new_expire_time
1716                 f = open(sf.home, 'rb+')
1717                 sf._write_lease_record(f, i, lease)
1718                 f.close()
1719             return
1720         raise IndexError("unable to renew non-existent lease")
1721
1722     def test_expire(self):
1723         basedir = "storage/LeaseCrawler/expire"
1724         fileutil.make_dirs(basedir)
1725         # setting expiration_time to 2000 means that any lease which is more
1726         # than 2000s old will be expired.
1727         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1728                                        expire_leases=True,
1729                                        expiration_time=2000)
1730         # make it start sooner than usual.
1731         lc = ss.lease_checker
1732         lc.slow_start = 0
1733         lc.stop_after_first_bucket = True
1734         webstatus = StorageStatus(ss)
1735
1736         # create a few shares, with some leases on them
1737         self.make_shares(ss)
1738         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1739
1740         def count_shares(si):
1741             return len(list(ss._iter_share_files(si)))
1742         def _get_sharefile(si):
1743             return list(ss._iter_share_files(si))[0]
1744         def count_leases(si):
1745             return len(list(_get_sharefile(si).get_leases()))
1746
1747         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1748         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1749         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1750         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1751         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1752         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1753         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1754         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1755
1756         # artificially crank back the expiration time on the first lease of
1757         # each share, to make it look like it expired already (age=1000s).
1758         # Some shares have an extra lease which is set to expire at the
1759         # default time in 31 days from now (age=31days). We then run the
1760         # crawler, which will expire the first lease, making some shares get
1761         # deleted and others stay alive (with one remaining lease)
1762         now = time.time()
1763
1764         sf0 = _get_sharefile(immutable_si_0)
1765         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1766         sf0_size = os.stat(sf0.home).st_size
1767
1768         # immutable_si_1 gets an extra lease
1769         sf1 = _get_sharefile(immutable_si_1)
1770         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1771
1772         sf2 = _get_sharefile(mutable_si_2)
1773         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1774         sf2_size = os.stat(sf2.home).st_size
1775
1776         # mutable_si_3 gets an extra lease
1777         sf3 = _get_sharefile(mutable_si_3)
1778         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1779
1780         ss.setServiceParent(self.s)
1781
1782         d = eventual.fireEventually()
1783         # examine the state right after the first bucket has been processed
1784         def _after_first_bucket(ignored):
1785             p = lc.get_progress()
1786             self.failUnless(p["cycle-in-progress"])
1787         d.addCallback(_after_first_bucket)
1788         d.addCallback(lambda ign: self.render1(webstatus))
1789         def _check_html_in_cycle(html):
1790             s = remove_tags(html)
1791             # the first bucket encountered gets deleted, and its prefix
1792             # happens to be about 1/6th of the way through the ring, so the
1793             # predictor thinks we'll have 6 shares and that we'll delete them
1794             # all. This part of the test depends upon the SIs landing right
1795             # where they do now.
1796             self.failUnlessIn("The remainder of this cycle is expected to "
1797                               "recover: 5 shares, 5 buckets", s)
1798             self.failUnlessIn("The whole cycle is expected to examine "
1799                               "6 shares in 6 buckets and to recover: "
1800                               "6 shares, 6 buckets", s)
1801         d.addCallback(_check_html_in_cycle)
1802
1803         # wait for the crawler to finish the first cycle. Two shares should
1804         # have been removed
1805         def _wait():
1806             return bool(lc.get_state()["last-cycle-finished"] is not None)
1807         d.addCallback(lambda ign: self.poll(_wait))
1808
1809         def _after_first_cycle(ignored):
1810             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1811             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1812             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1813             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1814             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1815             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1816
1817             s = lc.get_state()
1818             last = s["history"][0]
1819
1820             self.failUnlessEqual(last["expiration-enabled"], True)
1821             self.failUnlessEqual(last["configured-expiration-time"], 2000)
1822             self.failUnlessEqual(last["buckets-examined"], 4)
1823             self.failUnlessEqual(last["shares-examined"], 4)
1824             self.failUnlessEqual(last["leases-per-share-histogram"],
1825                                  {1: 2, 2: 2})
1826
1827             rec = last["space-recovered"]
1828             self.failUnlessEqual(rec["actual-numbuckets"], 2)
1829             self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 2)
1830             self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 2)
1831             self.failUnlessEqual(rec["actual-numshares"], 2)
1832             self.failUnlessEqual(rec["original-leasetimer-numshares"], 2)
1833             self.failUnlessEqual(rec["configured-leasetimer-numshares"], 2)
1834             size = sf0_size + sf2_size
1835             self.failUnlessEqual(rec["actual-sharebytes"], size)
1836             self.failUnlessEqual(rec["original-leasetimer-sharebytes"], size)
1837             self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], size)
1838             # different platforms have different notions of "blocks used by
1839             # this file", so merely assert that it's a number
1840             self.failUnless(rec["actual-diskbytes"] >= 0,
1841                             rec["actual-diskbytes"])
1842             self.failUnless(rec["original-leasetimer-diskbytes"] >= 0,
1843                             rec["original-leasetimer-diskbytes"])
1844             self.failUnless(rec["configured-leasetimer-diskbytes"] >= 0,
1845                             rec["configured-leasetimer-diskbytes"])
1846         d.addCallback(_after_first_cycle)
1847         d.addCallback(lambda ign: self.render1(webstatus))
1848         def _check_html(html):
1849             s = remove_tags(html)
1850             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1851             self.failUnlessIn(" recovered: 2 shares, 2 buckets, ", s)
1852         d.addCallback(_check_html)
1853         return d
1854
1855     def test_limited_history(self):
1856         basedir = "storage/LeaseCrawler/limited_history"
1857         fileutil.make_dirs(basedir)
1858         ss = StorageServer(basedir, "\x00" * 20)
1859         # make it start sooner than usual.
1860         lc = ss.lease_checker
1861         lc.slow_start = 0
1862         lc.cpu_slice = 500
1863
1864         # create a few shares, with some leases on them
1865         self.make_shares(ss)
1866
1867         ss.setServiceParent(self.s)
1868
1869         def _wait_until_15_cycles_done():
1870             last = lc.state["last-cycle-finished"]
1871             if last is not None and last >= 15:
1872                 return True
1873             if lc.timer:
1874                 lc.timer.reset(0)
1875             return False
1876         d = self.poll(_wait_until_15_cycles_done)
1877
1878         def _check(ignored):
1879             s = lc.get_state()
1880             h = s["history"]
1881             self.failUnlessEqual(len(h), 10)
1882             self.failUnlessEqual(max(h.keys()), 15)
1883             self.failUnlessEqual(min(h.keys()), 6)
1884         d.addCallback(_check)
1885         return d
1886
1887     def test_unpredictable_future(self):
1888         basedir = "storage/LeaseCrawler/unpredictable_future"
1889         fileutil.make_dirs(basedir)
1890         ss = StorageServer(basedir, "\x00" * 20)
1891         # make it start sooner than usual.
1892         lc = ss.lease_checker
1893         lc.slow_start = 0
1894         lc.cpu_slice = -1.0 # stop quickly
1895
1896         self.make_shares(ss)
1897
1898         ss.setServiceParent(self.s)
1899
1900         d = eventual.fireEventually()
1901         def _check(ignored):
1902             # this should fire after the first bucket is complete, but before
1903             # the first prefix is complete, so the progress-measurer won't
1904             # think we've gotten far enough to raise our percent-complete
1905             # above 0%, triggering the cannot-predict-the-future code in
1906             # expirer.py . This will have to change if/when the
1907             # progress-measurer gets smart enough to count buckets (we'll
1908             # have to interrupt it even earlier, before it's finished the
1909             # first bucket).
1910             s = lc.get_state()
1911             self.failUnless("cycle-to-date" in s)
1912             self.failUnless("estimated-remaining-cycle" in s)
1913             self.failUnless("estimated-current-cycle" in s)
1914
1915             left = s["estimated-remaining-cycle"]["space-recovered"]
1916             self.failUnlessEqual(left["actual-numbuckets"], None)
1917             self.failUnlessEqual(left["original-leasetimer-numbuckets"], None)
1918             self.failUnlessEqual(left["configured-leasetimer-numbuckets"], None)
1919             self.failUnlessEqual(left["actual-numshares"], None)
1920             self.failUnlessEqual(left["original-leasetimer-numshares"], None)
1921             self.failUnlessEqual(left["configured-leasetimer-numshares"], None)
1922             self.failUnlessEqual(left["actual-diskbytes"], None)
1923             self.failUnlessEqual(left["original-leasetimer-diskbytes"], None)
1924             self.failUnlessEqual(left["configured-leasetimer-diskbytes"], None)
1925             self.failUnlessEqual(left["actual-sharebytes"], None)
1926             self.failUnlessEqual(left["original-leasetimer-sharebytes"], None)
1927             self.failUnlessEqual(left["configured-leasetimer-sharebytes"], None)
1928
1929             full = s["estimated-remaining-cycle"]["space-recovered"]
1930             self.failUnlessEqual(full["actual-numbuckets"], None)
1931             self.failUnlessEqual(full["original-leasetimer-numbuckets"], None)
1932             self.failUnlessEqual(full["configured-leasetimer-numbuckets"], None)
1933             self.failUnlessEqual(full["actual-numshares"], None)
1934             self.failUnlessEqual(full["original-leasetimer-numshares"], None)
1935             self.failUnlessEqual(full["configured-leasetimer-numshares"], None)
1936             self.failUnlessEqual(full["actual-diskbytes"], None)
1937             self.failUnlessEqual(full["original-leasetimer-diskbytes"], None)
1938             self.failUnlessEqual(full["configured-leasetimer-diskbytes"], None)
1939             self.failUnlessEqual(full["actual-sharebytes"], None)
1940             self.failUnlessEqual(full["original-leasetimer-sharebytes"], None)
1941             self.failUnlessEqual(full["configured-leasetimer-sharebytes"], None)
1942
1943         d.addCallback(_check)
1944         return d
1945
1946     def test_no_st_blocks(self):
1947         basedir = "storage/LeaseCrawler/no_st_blocks"
1948         fileutil.make_dirs(basedir)
1949         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
1950                                         expiration_time=-1000)
1951         # a negative expiration_time= means the "configured-leasetimer-"
1952         # space-recovered counts will be non-zero, since all shares will have
1953         # expired by then
1954
1955         # make it start sooner than usual.
1956         lc = ss.lease_checker
1957         lc.slow_start = 0
1958
1959         self.make_shares(ss)
1960         ss.setServiceParent(self.s)
1961         def _wait():
1962             return bool(lc.get_state()["last-cycle-finished"] is not None)
1963         d = self.poll(_wait)
1964
1965         def _check(ignored):
1966             s = lc.get_state()
1967             last = s["history"][0]
1968             rec = last["space-recovered"]
1969             self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 4)
1970             self.failUnlessEqual(rec["configured-leasetimer-numshares"], 4)
1971             self.failUnless(rec["configured-leasetimer-sharebytes"] > 0,
1972                             rec["configured-leasetimer-sharebytes"])
1973             # without the .st_blocks field in os.stat() results, we should be
1974             # reporting diskbytes==sharebytes
1975             self.failUnlessEqual(rec["configured-leasetimer-sharebytes"],
1976                                  rec["configured-leasetimer-diskbytes"])
1977         d.addCallback(_check)
1978         return d
1979
1980 class NoStatvfsServer(StorageServer):
1981     def do_statvfs(self):
1982         raise AttributeError
1983
1984 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1985
1986     def setUp(self):
1987         self.s = service.MultiService()
1988         self.s.startService()
1989     def tearDown(self):
1990         return self.s.stopService()
1991
1992     def test_no_server(self):
1993         w = StorageStatus(None)
1994         html = w.renderSynchronously()
1995         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
1996
1997     def test_status(self):
1998         basedir = "storage/WebStatus/status"
1999         fileutil.make_dirs(basedir)
2000         ss = StorageServer(basedir, "\x00" * 20)
2001         ss.setServiceParent(self.s)
2002         w = StorageStatus(ss)
2003         d = self.render1(w)
2004         def _check_html(html):
2005             self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2006             s = remove_tags(html)
2007             self.failUnless("Accepting new shares: Yes" in s, s)
2008             self.failUnless("Reserved space: - 0 B (0)" in s, s)
2009         d.addCallback(_check_html)
2010         d.addCallback(lambda ign: self.render_json(w))
2011         def _check_json(json):
2012             data = simplejson.loads(json)
2013             s = data["stats"]
2014             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2015             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2016             self.failUnless("bucket-counter" in data)
2017             self.failUnless("lease-checker" in data)
2018         d.addCallback(_check_json)
2019         return d
2020
2021     def render_json(self, page):
2022         d = self.render1(page, args={"t": ["json"]})
2023         return d
2024
2025     def test_status_no_statvfs(self):
2026         # windows has no os.statvfs . Make sure the code handles that even on
2027         # unix.
2028         basedir = "storage/WebStatus/status_no_statvfs"
2029         fileutil.make_dirs(basedir)
2030         ss = NoStatvfsServer(basedir, "\x00" * 20)
2031         ss.setServiceParent(self.s)
2032         w = StorageStatus(ss)
2033         html = w.renderSynchronously()
2034         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2035         s = remove_tags(html)
2036         self.failUnless("Accepting new shares: Yes" in s, s)
2037         self.failUnless("Total disk space: ?" in s, s)
2038
2039     def test_readonly(self):
2040         basedir = "storage/WebStatus/readonly"
2041         fileutil.make_dirs(basedir)
2042         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2043         ss.setServiceParent(self.s)
2044         w = StorageStatus(ss)
2045         html = w.renderSynchronously()
2046         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2047         s = remove_tags(html)
2048         self.failUnless("Accepting new shares: No" in s, s)
2049
2050     def test_reserved(self):
2051         basedir = "storage/WebStatus/reserved"
2052         fileutil.make_dirs(basedir)
2053         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2054         ss.setServiceParent(self.s)
2055         w = StorageStatus(ss)
2056         html = w.renderSynchronously()
2057         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2058         s = remove_tags(html)
2059         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2060
2061     def test_huge_reserved(self):
2062         basedir = "storage/WebStatus/reserved"
2063         fileutil.make_dirs(basedir)
2064         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2065         ss.setServiceParent(self.s)
2066         w = StorageStatus(ss)
2067         html = w.renderSynchronously()
2068         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2069         s = remove_tags(html)
2070         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2071
2072     def test_util(self):
2073         w = StorageStatus(None)
2074         self.failUnlessEqual(w.render_space(None, None), "?")
2075         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2076         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2077         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2078         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2079         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
2080