]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
add utility function to parse durations, for lease-expiration config
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re, simplejson, struct
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
26
27 class Marker:
28     pass
29 class FakeCanary:
30     def __init__(self, ignore_disconnectors=False):
31         self.ignore = ignore_disconnectors
32         self.disconnectors = {}
33     def notifyOnDisconnect(self, f, *args, **kwargs):
34         if self.ignore:
35             return
36         m = Marker()
37         self.disconnectors[m] = (f, args, kwargs)
38         return m
39     def dontNotifyOnDisconnect(self, marker):
40         if self.ignore:
41             return
42         del self.disconnectors[marker]
43
44 class FakeStatsProvider:
45     def count(self, name, delta=1):
46         pass
47     def register_producer(self, producer):
48         pass
49
50 class Bucket(unittest.TestCase):
51     def make_workdir(self, name):
52         basedir = os.path.join("storage", "Bucket", name)
53         incoming = os.path.join(basedir, "tmp", "bucket")
54         final = os.path.join(basedir, "bucket")
55         fileutil.make_dirs(basedir)
56         fileutil.make_dirs(os.path.join(basedir, "tmp"))
57         return incoming, final
58
59     def bucket_writer_closed(self, bw, consumed):
60         pass
61     def add_latency(self, category, latency):
62         pass
63     def count(self, name, delta=1):
64         pass
65
66     def make_lease(self):
67         owner_num = 0
68         renew_secret = os.urandom(32)
69         cancel_secret = os.urandom(32)
70         expiration_time = time.time() + 5000
71         return LeaseInfo(owner_num, renew_secret, cancel_secret,
72                          expiration_time, "\x00" * 20)
73
74     def test_create(self):
75         incoming, final = self.make_workdir("test_create")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*25)
81         bw.remote_write(75, "d"*7)
82         bw.remote_close()
83
84     def test_readwrite(self):
85         incoming, final = self.make_workdir("test_readwrite")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*7) # last block may be short
91         bw.remote_close()
92
93         # now read from it
94         br = BucketReader(self, bw.finalhome)
95         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
98
99 class RemoteBucket:
100
101     def callRemote(self, methname, *args, **kwargs):
102         def _call():
103             meth = getattr(self.target, "remote_" + methname)
104             return meth(*args, **kwargs)
105         return defer.maybeDeferred(_call)
106
107 class BucketProxy(unittest.TestCase):
108     def make_bucket(self, name, size):
109         basedir = os.path.join("storage", "BucketProxy", name)
110         incoming = os.path.join(basedir, "tmp", "bucket")
111         final = os.path.join(basedir, "bucket")
112         fileutil.make_dirs(basedir)
113         fileutil.make_dirs(os.path.join(basedir, "tmp"))
114         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
115                           FakeCanary())
116         rb = RemoteBucket()
117         rb.target = bw
118         return bw, rb, final
119
120     def make_lease(self):
121         owner_num = 0
122         renew_secret = os.urandom(32)
123         cancel_secret = os.urandom(32)
124         expiration_time = time.time() + 5000
125         return LeaseInfo(owner_num, renew_secret, cancel_secret,
126                          expiration_time, "\x00" * 20)
127
128     def bucket_writer_closed(self, bw, consumed):
129         pass
130     def add_latency(self, category, latency):
131         pass
132     def count(self, name, delta=1):
133         pass
134
135     def test_create(self):
136         bw, rb, sharefname = self.make_bucket("test_create", 500)
137         bp = WriteBucketProxy(rb,
138                               data_size=300,
139                               block_size=10,
140                               num_segments=5,
141                               num_share_hashes=3,
142                               uri_extension_size_max=500, nodeid=None)
143         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
144
145     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146         # Let's pretend each share has 100 bytes of data, and that there are
147         # 4 segments (25 bytes each), and 8 shares total. So the two
148         # per-segment merkle trees (crypttext_hash_tree,
149         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152         # long. That should make the whole share:
153         #
154         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
156
157         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
158
159         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
160                             for i in range(7)]
161         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
162                         for i in range(7)]
163         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
164                         for i in (1,9,13)]
165         uri_extension = "s" + "E"*498 + "e"
166
167         bw, rb, sharefname = self.make_bucket(name, sharesize)
168         bp = wbp_class(rb,
169                        data_size=95,
170                        block_size=25,
171                        num_segments=4,
172                        num_share_hashes=3,
173                        uri_extension_size_max=len(uri_extension),
174                        nodeid=None)
175
176         d = bp.put_header()
177         d.addCallback(lambda res: bp.put_block(0, "a"*25))
178         d.addCallback(lambda res: bp.put_block(1, "b"*25))
179         d.addCallback(lambda res: bp.put_block(2, "c"*25))
180         d.addCallback(lambda res: bp.put_block(3, "d"*20))
181         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185         d.addCallback(lambda res: bp.close())
186
187         # now read everything back
188         def _start_reading(res):
189             br = BucketReader(self, sharefname)
190             rb = RemoteBucket()
191             rb.target = br
192             rbp = rbp_class(rb, peerid="abc", storage_index="")
193             self.failUnless("to peer" in repr(rbp))
194             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
195
196             d1 = rbp.get_block_data(0, 25, 25)
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
204
205             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, crypttext_hashes))
208             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210             d1.addCallback(lambda res: rbp.get_share_hashes())
211             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212             d1.addCallback(lambda res: rbp.get_uri_extension())
213             d1.addCallback(lambda res:
214                            self.failUnlessEqual(res, uri_extension))
215
216             return d1
217
218         d.addCallback(_start_reading)
219
220         return d
221
222     def test_readwrite_v1(self):
223         return self._do_test_readwrite("test_readwrite_v1",
224                                        0x24, WriteBucketProxy, ReadBucketProxy)
225
226     def test_readwrite_v2(self):
227         return self._do_test_readwrite("test_readwrite_v2",
228                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
229
230 class FakeDiskStorageServer(StorageServer):
231     def stat_disk(self, d):
232         return self.DISKAVAIL
233
234 class Server(unittest.TestCase):
235
236     def setUp(self):
237         self.sparent = LoggingServiceParent()
238         self.sparent.startService()
239         self._lease_secret = itertools.count()
240     def tearDown(self):
241         return self.sparent.stopService()
242
243     def workdir(self, name):
244         basedir = os.path.join("storage", "Server", name)
245         return basedir
246
247     def create(self, name, reserved_space=0, klass=StorageServer):
248         workdir = self.workdir(name)
249         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
250                    stats_provider=FakeStatsProvider())
251         ss.setServiceParent(self.sparent)
252         return ss
253
254     def test_create(self):
255         ss = self.create("test_create")
256
257     def allocate(self, ss, storage_index, sharenums, size, canary=None):
258         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260         if not canary:
261             canary = FakeCanary()
262         return ss.remote_allocate_buckets(storage_index,
263                                           renew_secret, cancel_secret,
264                                           sharenums, size, canary)
265
266     def test_large_share(self):
267         ss = self.create("test_large_share")
268
269         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
270         self.failUnlessEqual(already, set())
271         self.failUnlessEqual(set(writers.keys()), set([0]))
272
273         shnum, bucket = writers.items()[0]
274         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
275         bucket.remote_write(2**32, "ab")
276         bucket.remote_close()
277
278         readers = ss.remote_get_buckets("allocate")
279         reader = readers[shnum]
280         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
281     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
282
283     def test_dont_overfill_dirs(self):
284         """
285         This test asserts that if you add a second share whose storage index
286         share lots of leading bits with an extant share (but isn't the exact
287         same storage index), this won't add an entry to the share directory.
288         """
289         ss = self.create("test_dont_overfill_dirs")
290         already, writers = self.allocate(ss, "storageindex", [0], 10)
291         for i, wb in writers.items():
292             wb.remote_write(0, "%10d" % i)
293             wb.remote_close()
294         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
295                                 "shares")
296         children_of_storedir = set(os.listdir(storedir))
297
298         # Now store another one under another storageindex that has leading
299         # chars the same as the first storageindex.
300         already, writers = self.allocate(ss, "storageindey", [0], 10)
301         for i, wb in writers.items():
302             wb.remote_write(0, "%10d" % i)
303             wb.remote_close()
304         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
305                                 "shares")
306         new_children_of_storedir = set(os.listdir(storedir))
307         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
308
309     def test_remove_incoming(self):
310         ss = self.create("test_remove_incoming")
311         already, writers = self.allocate(ss, "vid", range(3), 10)
312         for i,wb in writers.items():
313             wb.remote_write(0, "%10d" % i)
314             wb.remote_close()
315         incoming_share_dir = wb.incominghome
316         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
317         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
318         incoming_dir = os.path.dirname(incoming_prefix_dir)
319         self.failIf(os.path.exists(incoming_bucket_dir))
320         self.failIf(os.path.exists(incoming_prefix_dir))
321         self.failUnless(os.path.exists(incoming_dir))
322
323     def test_allocate(self):
324         ss = self.create("test_allocate")
325
326         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
327
328         canary = FakeCanary()
329         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330         self.failUnlessEqual(already, set())
331         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
332
333         # while the buckets are open, they should not count as readable
334         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
335
336         # close the buckets
337         for i,wb in writers.items():
338             wb.remote_write(0, "%25d" % i)
339             wb.remote_close()
340             # aborting a bucket that was already closed is a no-op
341             wb.remote_abort()
342
343         # now they should be readable
344         b = ss.remote_get_buckets("allocate")
345         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
347         b_str = str(b[0])
348         self.failUnless("BucketReader" in b_str, b_str)
349         self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
350
351         # now if we ask about writing again, the server should offer those
352         # three buckets as already present. It should offer them even if we
353         # don't ask about those specific ones.
354         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355         self.failUnlessEqual(already, set([0,1,2]))
356         self.failUnlessEqual(set(writers.keys()), set([3,4]))
357
358         # while those two buckets are open for writing, the server should
359         # refuse to offer them to uploaders
360
361         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362         self.failUnlessEqual(already2, set([0,1,2]))
363         self.failUnlessEqual(set(writers2.keys()), set([5]))
364
365         # aborting the writes should remove the tempfiles
366         for i,wb in writers2.items():
367             wb.remote_abort()
368         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369         self.failUnlessEqual(already2, set([0,1,2]))
370         self.failUnlessEqual(set(writers2.keys()), set([5]))
371
372         for i,wb in writers2.items():
373             wb.remote_abort()
374         for i,wb in writers.items():
375             wb.remote_abort()
376
377     def test_bad_container_version(self):
378         ss = self.create("test_bad_container_version")
379         a,w = self.allocate(ss, "si1", [0], 10)
380         w[0].remote_write(0, "\xff"*10)
381         w[0].remote_close()
382
383         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
384         f = open(fn, "rb+")
385         f.seek(0)
386         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
387         f.close()
388
389         b = ss.remote_get_buckets("allocate")
390
391         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392                                   ss.remote_get_buckets, "si1")
393         self.failUnless(" had version 0 but we wanted 1" in str(e), e)
394
395     def test_disconnect(self):
396         # simulate a disconnection
397         ss = self.create("test_disconnect")
398         canary = FakeCanary()
399         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400         self.failUnlessEqual(already, set())
401         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402         for (f,args,kwargs) in canary.disconnectors.values():
403             f(*args, **kwargs)
404         del already
405         del writers
406
407         # that ought to delete the incoming shares
408         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409         self.failUnlessEqual(already, set())
410         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
411
412     def test_reserved_space(self):
413         ss = self.create("test_reserved_space", reserved_space=10000,
414                          klass=FakeDiskStorageServer)
415         # the FakeDiskStorageServer doesn't do real statvfs() calls
416         ss.DISKAVAIL = 15000
417         # 15k available, 10k reserved, leaves 5k for shares
418
419         # a newly created and filled share incurs this much overhead, beyond
420         # the size we request.
421         OVERHEAD = 3*4
422         LEASE_SIZE = 4+32+32+4
423         canary = FakeCanary(True)
424         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425         self.failUnlessEqual(len(writers), 3)
426         # now the StorageServer should have 3000 bytes provisionally
427         # allocated, allowing only 2000 more to be claimed
428         self.failUnlessEqual(len(ss._active_writers), 3)
429
430         # allocating 1001-byte shares only leaves room for one
431         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432         self.failUnlessEqual(len(writers2), 1)
433         self.failUnlessEqual(len(ss._active_writers), 4)
434
435         # we abandon the first set, so their provisional allocation should be
436         # returned
437         del already
438         del writers
439         self.failUnlessEqual(len(ss._active_writers), 1)
440         # now we have a provisional allocation of 1001 bytes
441
442         # and we close the second set, so their provisional allocation should
443         # become real, long-term allocation, and grows to include the
444         # overhead.
445         for bw in writers2.values():
446             bw.remote_write(0, "a"*25)
447             bw.remote_close()
448         del already2
449         del writers2
450         del bw
451         self.failUnlessEqual(len(ss._active_writers), 0)
452
453         allocated = 1001 + OVERHEAD + LEASE_SIZE
454
455         # we have to manually increase DISKAVAIL, since we're not doing real
456         # disk measurements
457         ss.DISKAVAIL -= allocated
458
459         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462         self.failUnlessEqual(len(writers3), 39)
463         self.failUnlessEqual(len(ss._active_writers), 39)
464
465         del already3
466         del writers3
467         self.failUnlessEqual(len(ss._active_writers), 0)
468         ss.disownServiceParent()
469         del ss
470
471     def test_seek(self):
472         basedir = self.workdir("test_seek_behavior")
473         fileutil.make_dirs(basedir)
474         filename = os.path.join(basedir, "testfile")
475         f = open(filename, "wb")
476         f.write("start")
477         f.close()
478         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
479         # files. mode="a" preserves previous contents but does not allow
480         # seeking-to-create-holes. mode="r+" allows both.
481         f = open(filename, "rb+")
482         f.seek(100)
483         f.write("100")
484         f.close()
485         filelen = os.stat(filename)[stat.ST_SIZE]
486         self.failUnlessEqual(filelen, 100+3)
487         f2 = open(filename, "rb")
488         self.failUnlessEqual(f2.read(5), "start")
489
490
491     def test_leases(self):
492         ss = self.create("test_leases")
493         canary = FakeCanary()
494         sharenums = range(5)
495         size = 100
496
497         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
498                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
499         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
500                                                      sharenums, size, canary)
501         self.failUnlessEqual(len(already), 0)
502         self.failUnlessEqual(len(writers), 5)
503         for wb in writers.values():
504             wb.remote_close()
505
506         leases = list(ss.get_leases("si0"))
507         self.failUnlessEqual(len(leases), 1)
508         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
509
510         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
511                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
512         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
513                                                      sharenums, size, canary)
514         for wb in writers.values():
515             wb.remote_close()
516
517         # take out a second lease on si1
518         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
521                                                      sharenums, size, canary)
522         self.failUnlessEqual(len(already), 5)
523         self.failUnlessEqual(len(writers), 0)
524
525         leases = list(ss.get_leases("si1"))
526         self.failUnlessEqual(len(leases), 2)
527         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
528
529         # and a third lease, using add-lease
530         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
531                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
532         ss.remote_add_lease("si1", rs2a, cs2a)
533         leases = list(ss.get_leases("si1"))
534         self.failUnlessEqual(len(leases), 3)
535         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
536
537         # add-lease on a missing storage index is silently ignored
538         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
539
540         # check that si0 is readable
541         readers = ss.remote_get_buckets("si0")
542         self.failUnlessEqual(len(readers), 5)
543
544         # renew the first lease. Only the proper renew_secret should work
545         ss.remote_renew_lease("si0", rs0)
546         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
547         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
548
549         # check that si0 is still readable
550         readers = ss.remote_get_buckets("si0")
551         self.failUnlessEqual(len(readers), 5)
552
553         # now cancel it
554         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
555         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
556         ss.remote_cancel_lease("si0", cs0)
557
558         # si0 should now be gone
559         readers = ss.remote_get_buckets("si0")
560         self.failUnlessEqual(len(readers), 0)
561         # and the renew should no longer work
562         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
563
564
565         # cancel the first lease on si1, leaving the second and third in place
566         ss.remote_cancel_lease("si1", cs1)
567         readers = ss.remote_get_buckets("si1")
568         self.failUnlessEqual(len(readers), 5)
569         # the corresponding renew should no longer work
570         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
571
572         leases = list(ss.get_leases("si1"))
573         self.failUnlessEqual(len(leases), 2)
574         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
575
576         ss.remote_renew_lease("si1", rs2)
577         # cancelling the second and third should make it go away
578         ss.remote_cancel_lease("si1", cs2)
579         ss.remote_cancel_lease("si1", cs2a)
580         readers = ss.remote_get_buckets("si1")
581         self.failUnlessEqual(len(readers), 0)
582         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
583         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
584         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
585
586         leases = list(ss.get_leases("si1"))
587         self.failUnlessEqual(len(leases), 0)
588
589
590         # test overlapping uploads
591         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
592                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
593         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
594                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
595         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
596                                                      sharenums, size, canary)
597         self.failUnlessEqual(len(already), 0)
598         self.failUnlessEqual(len(writers), 5)
599         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
600                                                        sharenums, size, canary)
601         self.failUnlessEqual(len(already2), 0)
602         self.failUnlessEqual(len(writers2), 0)
603         for wb in writers.values():
604             wb.remote_close()
605
606         leases = list(ss.get_leases("si3"))
607         self.failUnlessEqual(len(leases), 1)
608
609         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
610                                                        sharenums, size, canary)
611         self.failUnlessEqual(len(already3), 5)
612         self.failUnlessEqual(len(writers3), 0)
613
614         leases = list(ss.get_leases("si3"))
615         self.failUnlessEqual(len(leases), 2)
616
617     def test_readonly(self):
618         workdir = self.workdir("test_readonly")
619         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
620         ss.setServiceParent(self.sparent)
621
622         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
623         self.failUnlessEqual(already, set())
624         self.failUnlessEqual(writers, {})
625
626         stats = ss.get_stats()
627         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
628                              False)
629         if "storage_server.disk_avail" in stats:
630             # windows does not have os.statvfs, so it doesn't give us disk
631             # stats. But if there are stats, readonly_storage means
632             # disk_avail=0
633             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
634
635     def test_discard(self):
636         # discard is really only used for other tests, but we test it anyways
637         workdir = self.workdir("test_discard")
638         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
639         ss.setServiceParent(self.sparent)
640
641         canary = FakeCanary()
642         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
643         self.failUnlessEqual(already, set())
644         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
645         for i,wb in writers.items():
646             wb.remote_write(0, "%25d" % i)
647             wb.remote_close()
648         # since we discard the data, the shares should be present but sparse.
649         # Since we write with some seeks, the data we read back will be all
650         # zeros.
651         b = ss.remote_get_buckets("vid")
652         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
653         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
654
655     def test_advise_corruption(self):
656         workdir = self.workdir("test_advise_corruption")
657         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
658         ss.setServiceParent(self.sparent)
659
660         si0_s = base32.b2a("si0")
661         ss.remote_advise_corrupt_share("immutable", "si0", 0,
662                                        "This share smells funny.\n")
663         reportdir = os.path.join(workdir, "corruption-advisories")
664         reports = os.listdir(reportdir)
665         self.failUnlessEqual(len(reports), 1)
666         report_si0 = reports[0]
667         self.failUnless(si0_s in report_si0, report_si0)
668         f = open(os.path.join(reportdir, report_si0), "r")
669         report = f.read()
670         f.close()
671         self.failUnless("type: immutable" in report)
672         self.failUnless(("storage_index: %s" % si0_s) in report)
673         self.failUnless("share_number: 0" in report)
674         self.failUnless("This share smells funny." in report)
675
676         # test the RIBucketWriter version too
677         si1_s = base32.b2a("si1")
678         already,writers = self.allocate(ss, "si1", [1], 75)
679         self.failUnlessEqual(already, set())
680         self.failUnlessEqual(set(writers.keys()), set([1]))
681         writers[1].remote_write(0, "data")
682         writers[1].remote_close()
683
684         b = ss.remote_get_buckets("si1")
685         self.failUnlessEqual(set(b.keys()), set([1]))
686         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
687
688         reports = os.listdir(reportdir)
689         self.failUnlessEqual(len(reports), 2)
690         report_si1 = [r for r in reports if si1_s in r][0]
691         f = open(os.path.join(reportdir, report_si1), "r")
692         report = f.read()
693         f.close()
694         self.failUnless("type: immutable" in report)
695         self.failUnless(("storage_index: %s" % si1_s) in report)
696         self.failUnless("share_number: 1" in report)
697         self.failUnless("This share tastes like dust." in report)
698
699
700
701 class MutableServer(unittest.TestCase):
702
703     def setUp(self):
704         self.sparent = LoggingServiceParent()
705         self._lease_secret = itertools.count()
706     def tearDown(self):
707         return self.sparent.stopService()
708
709     def workdir(self, name):
710         basedir = os.path.join("storage", "MutableServer", name)
711         return basedir
712
713     def create(self, name):
714         workdir = self.workdir(name)
715         ss = StorageServer(workdir, "\x00" * 20)
716         ss.setServiceParent(self.sparent)
717         return ss
718
719     def test_create(self):
720         ss = self.create("test_create")
721
722     def write_enabler(self, we_tag):
723         return hashutil.tagged_hash("we_blah", we_tag)
724
725     def renew_secret(self, tag):
726         return hashutil.tagged_hash("renew_blah", str(tag))
727
728     def cancel_secret(self, tag):
729         return hashutil.tagged_hash("cancel_blah", str(tag))
730
731     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
732         write_enabler = self.write_enabler(we_tag)
733         renew_secret = self.renew_secret(lease_tag)
734         cancel_secret = self.cancel_secret(lease_tag)
735         rstaraw = ss.remote_slot_testv_and_readv_and_writev
736         testandwritev = dict( [ (shnum, ([], [], None) )
737                          for shnum in sharenums ] )
738         readv = []
739         rc = rstaraw(storage_index,
740                      (write_enabler, renew_secret, cancel_secret),
741                      testandwritev,
742                      readv)
743         (did_write, readv_data) = rc
744         self.failUnless(did_write)
745         self.failUnless(isinstance(readv_data, dict))
746         self.failUnlessEqual(len(readv_data), 0)
747
748     def test_bad_magic(self):
749         ss = self.create("test_bad_magic")
750         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
751         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
752         f = open(fn, "rb+")
753         f.seek(0)
754         f.write("BAD MAGIC")
755         f.close()
756         read = ss.remote_slot_readv
757         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
758                                   read, "si1", [0], [(0,10)])
759         self.failUnless(" had magic " in str(e), e)
760         self.failUnless(" but we wanted " in str(e), e)
761
762     def test_container_size(self):
763         ss = self.create("test_container_size")
764         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
765                       set([0,1,2]), 100)
766         read = ss.remote_slot_readv
767         rstaraw = ss.remote_slot_testv_and_readv_and_writev
768         secrets = ( self.write_enabler("we1"),
769                     self.renew_secret("we1"),
770                     self.cancel_secret("we1") )
771         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
772         answer = rstaraw("si1", secrets,
773                          {0: ([], [(0,data)], len(data)+12)},
774                          [])
775         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
776
777         # trying to make the container too large will raise an exception
778         TOOBIG = MutableShareFile.MAX_SIZE + 10
779         self.failUnlessRaises(DataTooLargeError,
780                               rstaraw, "si1", secrets,
781                               {0: ([], [(0,data)], TOOBIG)},
782                               [])
783
784         # it should be possible to make the container smaller, although at
785         # the moment this doesn't actually affect the share, unless the
786         # container size is dropped to zero, in which case the share is
787         # deleted.
788         answer = rstaraw("si1", secrets,
789                          {0: ([], [(0,data)], len(data)+8)},
790                          [])
791         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
792
793         answer = rstaraw("si1", secrets,
794                          {0: ([], [(0,data)], 0)},
795                          [])
796         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
797
798         read_answer = read("si1", [0], [(0,10)])
799         self.failUnlessEqual(read_answer, {})
800
801     def test_allocate(self):
802         ss = self.create("test_allocate")
803         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
804                       set([0,1,2]), 100)
805
806         read = ss.remote_slot_readv
807         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
808                              {0: [""]})
809         self.failUnlessEqual(read("si1", [], [(0, 10)]),
810                              {0: [""], 1: [""], 2: [""]})
811         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
812                              {0: [""]})
813
814         # try writing to one
815         secrets = ( self.write_enabler("we1"),
816                     self.renew_secret("we1"),
817                     self.cancel_secret("we1") )
818         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
819         write = ss.remote_slot_testv_and_readv_and_writev
820         answer = write("si1", secrets,
821                        {0: ([], [(0,data)], None)},
822                        [])
823         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
824
825         self.failUnlessEqual(read("si1", [0], [(0,20)]),
826                              {0: ["00000000001111111111"]})
827         self.failUnlessEqual(read("si1", [0], [(95,10)]),
828                              {0: ["99999"]})
829         #self.failUnlessEqual(s0.remote_get_length(), 100)
830
831         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
832         f = self.failUnlessRaises(BadWriteEnablerError,
833                                   write, "si1", bad_secrets,
834                                   {}, [])
835         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
836
837         # this testv should fail
838         answer = write("si1", secrets,
839                        {0: ([(0, 12, "eq", "444444444444"),
840                              (20, 5, "eq", "22222"),
841                              ],
842                             [(0, "x"*100)],
843                             None),
844                         },
845                        [(0,12), (20,5)],
846                        )
847         self.failUnlessEqual(answer, (False,
848                                       {0: ["000000000011", "22222"],
849                                        1: ["", ""],
850                                        2: ["", ""],
851                                        }))
852         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
853
854         # as should this one
855         answer = write("si1", secrets,
856                        {0: ([(10, 5, "lt", "11111"),
857                              ],
858                             [(0, "x"*100)],
859                             None),
860                         },
861                        [(10,5)],
862                        )
863         self.failUnlessEqual(answer, (False,
864                                       {0: ["11111"],
865                                        1: [""],
866                                        2: [""]},
867                                       ))
868         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
869
870
871     def test_operators(self):
872         # test operators, the data we're comparing is '11111' in all cases.
873         # test both fail+pass, reset data after each one.
874         ss = self.create("test_operators")
875
876         secrets = ( self.write_enabler("we1"),
877                     self.renew_secret("we1"),
878                     self.cancel_secret("we1") )
879         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
880         write = ss.remote_slot_testv_and_readv_and_writev
881         read = ss.remote_slot_readv
882
883         def reset():
884             write("si1", secrets,
885                   {0: ([], [(0,data)], None)},
886                   [])
887
888         reset()
889
890         #  lt
891         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
892                                              ],
893                                             [(0, "x"*100)],
894                                             None,
895                                             )}, [(10,5)])
896         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
897         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
898         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
899         reset()
900
901         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
902                                              ],
903                                             [(0, "x"*100)],
904                                             None,
905                                             )}, [(10,5)])
906         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
907         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
908         reset()
909
910         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
911                                              ],
912                                             [(0, "y"*100)],
913                                             None,
914                                             )}, [(10,5)])
915         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
916         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
917         reset()
918
919         #  le
920         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
921                                              ],
922                                             [(0, "x"*100)],
923                                             None,
924                                             )}, [(10,5)])
925         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
926         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
927         reset()
928
929         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
930                                              ],
931                                             [(0, "y"*100)],
932                                             None,
933                                             )}, [(10,5)])
934         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
935         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
936         reset()
937
938         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
939                                              ],
940                                             [(0, "y"*100)],
941                                             None,
942                                             )}, [(10,5)])
943         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
944         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
945         reset()
946
947         #  eq
948         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
949                                              ],
950                                             [(0, "x"*100)],
951                                             None,
952                                             )}, [(10,5)])
953         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
954         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
955         reset()
956
957         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
958                                              ],
959                                             [(0, "y"*100)],
960                                             None,
961                                             )}, [(10,5)])
962         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
963         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
964         reset()
965
966         #  ne
967         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
968                                              ],
969                                             [(0, "x"*100)],
970                                             None,
971                                             )}, [(10,5)])
972         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
973         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
974         reset()
975
976         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
977                                              ],
978                                             [(0, "y"*100)],
979                                             None,
980                                             )}, [(10,5)])
981         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
982         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
983         reset()
984
985         #  ge
986         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
987                                              ],
988                                             [(0, "y"*100)],
989                                             None,
990                                             )}, [(10,5)])
991         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
992         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
993         reset()
994
995         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
996                                              ],
997                                             [(0, "y"*100)],
998                                             None,
999                                             )}, [(10,5)])
1000         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1001         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1002         reset()
1003
1004         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1005                                              ],
1006                                             [(0, "y"*100)],
1007                                             None,
1008                                             )}, [(10,5)])
1009         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1010         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1011         reset()
1012
1013         #  gt
1014         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1015                                              ],
1016                                             [(0, "y"*100)],
1017                                             None,
1018                                             )}, [(10,5)])
1019         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1020         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1021         reset()
1022
1023         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1024                                              ],
1025                                             [(0, "x"*100)],
1026                                             None,
1027                                             )}, [(10,5)])
1028         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1029         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1030         reset()
1031
1032         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1033                                              ],
1034                                             [(0, "x"*100)],
1035                                             None,
1036                                             )}, [(10,5)])
1037         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1038         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1039         reset()
1040
1041         # finally, test some operators against empty shares
1042         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1043                                              ],
1044                                             [(0, "x"*100)],
1045                                             None,
1046                                             )}, [(10,5)])
1047         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1048         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1049         reset()
1050
1051     def test_readv(self):
1052         ss = self.create("test_readv")
1053         secrets = ( self.write_enabler("we1"),
1054                     self.renew_secret("we1"),
1055                     self.cancel_secret("we1") )
1056         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1057         write = ss.remote_slot_testv_and_readv_and_writev
1058         read = ss.remote_slot_readv
1059         data = [("%d" % i) * 100 for i in range(3)]
1060         rc = write("si1", secrets,
1061                    {0: ([], [(0,data[0])], None),
1062                     1: ([], [(0,data[1])], None),
1063                     2: ([], [(0,data[2])], None),
1064                     }, [])
1065         self.failUnlessEqual(rc, (True, {}))
1066
1067         answer = read("si1", [], [(0, 10)])
1068         self.failUnlessEqual(answer, {0: ["0"*10],
1069                                       1: ["1"*10],
1070                                       2: ["2"*10]})
1071
1072     def compare_leases_without_timestamps(self, leases_a, leases_b):
1073         self.failUnlessEqual(len(leases_a), len(leases_b))
1074         for i in range(len(leases_a)):
1075             a = leases_a[i]
1076             b = leases_b[i]
1077             self.failUnlessEqual(a.owner_num,       b.owner_num)
1078             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1079             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1080             self.failUnlessEqual(a.nodeid,          b.nodeid)
1081
1082     def compare_leases(self, leases_a, leases_b):
1083         self.failUnlessEqual(len(leases_a), len(leases_b))
1084         for i in range(len(leases_a)):
1085             a = leases_a[i]
1086             b = leases_b[i]
1087             self.failUnlessEqual(a.owner_num,       b.owner_num)
1088             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1089             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1090             self.failUnlessEqual(a.nodeid,          b.nodeid)
1091             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1092
1093     def test_leases(self):
1094         ss = self.create("test_leases")
1095         def secrets(n):
1096             return ( self.write_enabler("we1"),
1097                      self.renew_secret("we1-%d" % n),
1098                      self.cancel_secret("we1-%d" % n) )
1099         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1100         write = ss.remote_slot_testv_and_readv_and_writev
1101         read = ss.remote_slot_readv
1102         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1103         self.failUnlessEqual(rc, (True, {}))
1104
1105         # create a random non-numeric file in the bucket directory, to
1106         # exercise the code that's supposed to ignore those.
1107         bucket_dir = os.path.join(self.workdir("test_leases"),
1108                                   "shares", storage_index_to_dir("si1"))
1109         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1110         f.write("you ought to be ignoring me\n")
1111         f.close()
1112
1113         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1114         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1115
1116         # add-lease on a missing storage index is silently ignored
1117         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1118
1119         # re-allocate the slots and use the same secrets, that should update
1120         # the lease
1121         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1122         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1123
1124         # renew it directly
1125         ss.remote_renew_lease("si1", secrets(0)[1])
1126         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1127
1128         # now allocate them with a bunch of different secrets, to trigger the
1129         # extended lease code. Use add_lease for one of them.
1130         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1131         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1132         secrets2 = secrets(2)
1133         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1134         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1135         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1136         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1137         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1138
1139         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1140
1141         # cancel one of them
1142         ss.remote_cancel_lease("si1", secrets(5)[2])
1143         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1144
1145         all_leases = list(s0.get_leases())
1146         # and write enough data to expand the container, forcing the server
1147         # to move the leases
1148         write("si1", secrets(0),
1149               {0: ([], [(0,data)], 200), },
1150               [])
1151
1152         # read back the leases, make sure they're still intact.
1153         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1154
1155         ss.remote_renew_lease("si1", secrets(0)[1])
1156         ss.remote_renew_lease("si1", secrets(1)[1])
1157         ss.remote_renew_lease("si1", secrets(2)[1])
1158         ss.remote_renew_lease("si1", secrets(3)[1])
1159         ss.remote_renew_lease("si1", secrets(4)[1])
1160         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1161         # get a new copy of the leases, with the current timestamps. Reading
1162         # data and failing to renew/cancel leases should leave the timestamps
1163         # alone.
1164         all_leases = list(s0.get_leases())
1165         # renewing with a bogus token should prompt an error message
1166
1167         # examine the exception thus raised, make sure the old nodeid is
1168         # present, to provide for share migration
1169         e = self.failUnlessRaises(IndexError,
1170                                   ss.remote_renew_lease, "si1",
1171                                   secrets(20)[1])
1172         e_s = str(e)
1173         self.failUnless("Unable to renew non-existent lease" in e_s)
1174         self.failUnless("I have leases accepted by nodeids:" in e_s)
1175         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1176
1177         # same for cancelling
1178         self.failUnlessRaises(IndexError,
1179                               ss.remote_cancel_lease, "si1",
1180                               secrets(20)[2])
1181         self.compare_leases(all_leases, list(s0.get_leases()))
1182
1183         # reading shares should not modify the timestamp
1184         read("si1", [], [(0,200)])
1185         self.compare_leases(all_leases, list(s0.get_leases()))
1186
1187         write("si1", secrets(0),
1188               {0: ([], [(200, "make me bigger")], None)}, [])
1189         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1190
1191         write("si1", secrets(0),
1192               {0: ([], [(500, "make me really bigger")], None)}, [])
1193         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1194
1195         # now cancel them all
1196         ss.remote_cancel_lease("si1", secrets(0)[2])
1197         ss.remote_cancel_lease("si1", secrets(1)[2])
1198         ss.remote_cancel_lease("si1", secrets(2)[2])
1199         ss.remote_cancel_lease("si1", secrets(3)[2])
1200
1201         # the slot should still be there
1202         remaining_shares = read("si1", [], [(0,10)])
1203         self.failUnlessEqual(len(remaining_shares), 1)
1204         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1205
1206         # cancelling a non-existent lease should raise an IndexError
1207         self.failUnlessRaises(IndexError,
1208                               ss.remote_cancel_lease, "si1", "nonsecret")
1209
1210         # and the slot should still be there
1211         remaining_shares = read("si1", [], [(0,10)])
1212         self.failUnlessEqual(len(remaining_shares), 1)
1213         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1214
1215         ss.remote_cancel_lease("si1", secrets(4)[2])
1216         # now the slot should be gone
1217         no_shares = read("si1", [], [(0,10)])
1218         self.failUnlessEqual(no_shares, {})
1219
1220         # cancelling a lease on a non-existent share should raise an IndexError
1221         self.failUnlessRaises(IndexError,
1222                               ss.remote_cancel_lease, "si2", "nonsecret")
1223
1224     def test_remove(self):
1225         ss = self.create("test_remove")
1226         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1227                       set([0,1,2]), 100)
1228         readv = ss.remote_slot_readv
1229         writev = ss.remote_slot_testv_and_readv_and_writev
1230         secrets = ( self.write_enabler("we1"),
1231                     self.renew_secret("we1"),
1232                     self.cancel_secret("we1") )
1233         # delete sh0 by setting its size to zero
1234         answer = writev("si1", secrets,
1235                         {0: ([], [], 0)},
1236                         [])
1237         # the answer should mention all the shares that existed before the
1238         # write
1239         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1240         # but a new read should show only sh1 and sh2
1241         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1242                              {1: [""], 2: [""]})
1243
1244         # delete sh1 by setting its size to zero
1245         answer = writev("si1", secrets,
1246                         {1: ([], [], 0)},
1247                         [])
1248         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1249         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1250                              {2: [""]})
1251
1252         # delete sh2 by setting its size to zero
1253         answer = writev("si1", secrets,
1254                         {2: ([], [], 0)},
1255                         [])
1256         self.failUnlessEqual(answer, (True, {2:[]}) )
1257         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1258                              {})
1259         # and the bucket directory should now be gone
1260         si = base32.b2a("si1")
1261         # note: this is a detail of the storage server implementation, and
1262         # may change in the future
1263         prefix = si[:2]
1264         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1265         bucketdir = os.path.join(prefixdir, si)
1266         self.failUnless(os.path.exists(prefixdir))
1267         self.failIf(os.path.exists(bucketdir))
1268
1269 class Stats(unittest.TestCase):
1270
1271     def setUp(self):
1272         self.sparent = LoggingServiceParent()
1273         self._lease_secret = itertools.count()
1274     def tearDown(self):
1275         return self.sparent.stopService()
1276
1277     def workdir(self, name):
1278         basedir = os.path.join("storage", "Server", name)
1279         return basedir
1280
1281     def create(self, name):
1282         workdir = self.workdir(name)
1283         ss = StorageServer(workdir, "\x00" * 20)
1284         ss.setServiceParent(self.sparent)
1285         return ss
1286
1287     def test_latencies(self):
1288         ss = self.create("test_latencies")
1289         for i in range(10000):
1290             ss.add_latency("allocate", 1.0 * i)
1291         for i in range(1000):
1292             ss.add_latency("renew", 1.0 * i)
1293         for i in range(10):
1294             ss.add_latency("cancel", 2.0 * i)
1295         ss.add_latency("get", 5.0)
1296
1297         output = ss.get_latencies()
1298
1299         self.failUnlessEqual(sorted(output.keys()),
1300                              sorted(["allocate", "renew", "cancel", "get"]))
1301         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1302         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1303         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1304         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1305         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1306         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1307         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1308         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1309         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1310
1311         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1312         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1313         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1314         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1315         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1316         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1317         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1318         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1319         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1320
1321         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1322         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1323         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1324         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1325         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1326         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1327         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1328         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1329         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1330
1331         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1332         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1333         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1334         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1335         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1336         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1337         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1338         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1339         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1340
1341 def remove_tags(s):
1342     s = re.sub(r'<[^>]*>', ' ', s)
1343     s = re.sub(r'\s+', ' ', s)
1344     return s
1345
1346 class MyBucketCountingCrawler(BucketCountingCrawler):
1347     def finished_prefix(self, cycle, prefix):
1348         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1349         if self.hook_ds:
1350             d = self.hook_ds.pop(0)
1351             d.callback(None)
1352
1353 class MyStorageServer(StorageServer):
1354     def add_bucket_counter(self):
1355         statefile = os.path.join(self.storedir, "bucket_counter.state")
1356         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1357         self.bucket_counter.setServiceParent(self)
1358
1359 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1360
1361     def setUp(self):
1362         self.s = service.MultiService()
1363         self.s.startService()
1364     def tearDown(self):
1365         return self.s.stopService()
1366
1367     def test_bucket_counter(self):
1368         basedir = "storage/BucketCounter/bucket_counter"
1369         fileutil.make_dirs(basedir)
1370         ss = StorageServer(basedir, "\x00" * 20)
1371         # to make sure we capture the bucket-counting-crawler in the middle
1372         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1373         # also make it start sooner than usual.
1374         ss.bucket_counter.slow_start = 0
1375         orig_cpu_slice = ss.bucket_counter.cpu_slice
1376         ss.bucket_counter.cpu_slice = 0
1377         ss.setServiceParent(self.s)
1378
1379         w = StorageStatus(ss)
1380
1381         # this sample is before the crawler has started doing anything
1382         html = w.renderSynchronously()
1383         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1384         s = remove_tags(html)
1385         self.failUnless("Accepting new shares: Yes" in s, s)
1386         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1387         self.failUnless("Total buckets: Not computed yet" in s, s)
1388         self.failUnless("Next crawl in" in s, s)
1389
1390         # give the bucket-counting-crawler one tick to get started. The
1391         # cpu_slice=0 will force it to yield right after it processes the
1392         # first prefix
1393
1394         d = eventual.fireEventually()
1395         def _check(ignored):
1396             # are we really right after the first prefix?
1397             state = ss.bucket_counter.get_state()
1398             self.failUnlessEqual(state["last-complete-prefix"],
1399                                  ss.bucket_counter.prefixes[0])
1400             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1401             html = w.renderSynchronously()
1402             s = remove_tags(html)
1403             self.failUnless(" Current crawl " in s, s)
1404             self.failUnless(" (next work in " in s, s)
1405         d.addCallback(_check)
1406
1407         # now give it enough time to complete a full cycle
1408         def _watch():
1409             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1410         d.addCallback(lambda ignored: self.poll(_watch))
1411         def _check2(ignored):
1412             ss.bucket_counter.cpu_slice = orig_cpu_slice
1413             html = w.renderSynchronously()
1414             s = remove_tags(html)
1415             self.failUnless("Total buckets: 0 (the number of" in s, s)
1416             self.failUnless("Next crawl in 59 minutes" in s, s)
1417         d.addCallback(_check2)
1418         return d
1419
1420     def test_bucket_counter_cleanup(self):
1421         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1422         fileutil.make_dirs(basedir)
1423         ss = StorageServer(basedir, "\x00" * 20)
1424         # to make sure we capture the bucket-counting-crawler in the middle
1425         # of a cycle, we reach in and reduce its maximum slice time to 0.
1426         ss.bucket_counter.slow_start = 0
1427         orig_cpu_slice = ss.bucket_counter.cpu_slice
1428         ss.bucket_counter.cpu_slice = 0
1429         ss.setServiceParent(self.s)
1430
1431         d = eventual.fireEventually()
1432
1433         def _after_first_prefix(ignored):
1434             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1435             # now sneak in and mess with its state, to make sure it cleans up
1436             # properly at the end of the cycle
1437             state = ss.bucket_counter.state
1438             self.failUnlessEqual(state["last-complete-prefix"],
1439                                  ss.bucket_counter.prefixes[0])
1440             state["bucket-counts"][-12] = {}
1441             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1442             ss.bucket_counter.save_state()
1443         d.addCallback(_after_first_prefix)
1444
1445         # now give it enough time to complete a cycle
1446         def _watch():
1447             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1448         d.addCallback(lambda ignored: self.poll(_watch))
1449         def _check2(ignored):
1450             ss.bucket_counter.cpu_slice = orig_cpu_slice
1451             s = ss.bucket_counter.get_state()
1452             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1453             self.failIf("bogusprefix!" in s["storage-index-samples"],
1454                         s["storage-index-samples"].keys())
1455         d.addCallback(_check2)
1456         return d
1457
1458     def test_bucket_counter_eta(self):
1459         basedir = "storage/BucketCounter/bucket_counter_eta"
1460         fileutil.make_dirs(basedir)
1461         ss = MyStorageServer(basedir, "\x00" * 20)
1462         ss.bucket_counter.slow_start = 0
1463         # these will be fired inside finished_prefix()
1464         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1465         w = StorageStatus(ss)
1466
1467         d = defer.Deferred()
1468
1469         def _check_1(ignored):
1470             # no ETA is available yet
1471             html = w.renderSynchronously()
1472             s = remove_tags(html)
1473             self.failUnlessIn("complete (next work", s)
1474
1475         def _check_2(ignored):
1476             # one prefix has finished, so an ETA based upon that elapsed time
1477             # should be available.
1478             html = w.renderSynchronously()
1479             s = remove_tags(html)
1480             self.failUnlessIn("complete (ETA ", s)
1481
1482         def _check_3(ignored):
1483             # two prefixes have finished
1484             html = w.renderSynchronously()
1485             s = remove_tags(html)
1486             self.failUnlessIn("complete (ETA ", s)
1487             d.callback("done")
1488
1489         hooks[0].addCallback(_check_1).addErrback(d.errback)
1490         hooks[1].addCallback(_check_2).addErrback(d.errback)
1491         hooks[2].addCallback(_check_3).addErrback(d.errback)
1492
1493         ss.setServiceParent(self.s)
1494         return d
1495
1496 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1497     stop_after_first_bucket = False
1498     def process_bucket(self, *args, **kwargs):
1499         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1500         if self.stop_after_first_bucket:
1501             self.stop_after_first_bucket = False
1502             self.cpu_slice = -1.0
1503     def yielding(self, sleep_time):
1504         if not self.stop_after_first_bucket:
1505             self.cpu_slice = 500
1506
1507 class BrokenStatResults:
1508     pass
1509 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1510     def stat(self, fn):
1511         s = os.stat(fn)
1512         bsr = BrokenStatResults()
1513         for attrname in dir(s):
1514             if attrname.startswith("_"):
1515                 continue
1516             if attrname == "st_blocks":
1517                 continue
1518             setattr(bsr, attrname, getattr(s, attrname))
1519         return bsr
1520
1521 class InstrumentedStorageServer(StorageServer):
1522     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1523 class No_ST_BLOCKS_StorageServer(StorageServer):
1524     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1525
1526 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1527
1528     def setUp(self):
1529         self.s = service.MultiService()
1530         self.s.startService()
1531     def tearDown(self):
1532         return self.s.stopService()
1533
1534     def make_shares(self, ss):
1535         def make(si):
1536             return (si, hashutil.tagged_hash("renew", si),
1537                     hashutil.tagged_hash("cancel", si))
1538         def make_mutable(si):
1539             return (si, hashutil.tagged_hash("renew", si),
1540                     hashutil.tagged_hash("cancel", si),
1541                     hashutil.tagged_hash("write-enabler", si))
1542         def make_extra_lease(si, num):
1543             return (hashutil.tagged_hash("renew-%d" % num, si),
1544                     hashutil.tagged_hash("cancel-%d" % num, si))
1545
1546         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1547         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1548         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1549         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1550         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1551         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1552         sharenums = [0]
1553         canary = FakeCanary()
1554         # note: 'tahoe debug dump-share' will not handle this file, since the
1555         # inner contents are not a valid CHK share
1556         data = "\xff" * 1000
1557
1558         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1559                                          1000, canary)
1560         w[0].remote_write(0, data)
1561         w[0].remote_close()
1562
1563         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1564                                          1000, canary)
1565         w[0].remote_write(0, data)
1566         w[0].remote_close()
1567         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1568
1569         writev = ss.remote_slot_testv_and_readv_and_writev
1570         writev(mutable_si_2, (we2, rs2, cs2),
1571                {0: ([], [(0,data)], len(data))}, [])
1572         writev(mutable_si_3, (we3, rs3, cs3),
1573                {0: ([], [(0,data)], len(data))}, [])
1574         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1575
1576         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1577         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1578         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1579
1580     def test_basic(self):
1581         basedir = "storage/LeaseCrawler/basic"
1582         fileutil.make_dirs(basedir)
1583         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1584         # make it start sooner than usual.
1585         lc = ss.lease_checker
1586         lc.slow_start = 0
1587         lc.cpu_slice = 500
1588         lc.stop_after_first_bucket = True
1589         webstatus = StorageStatus(ss)
1590
1591         # create a few shares, with some leases on them
1592         self.make_shares(ss)
1593         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1594
1595         # add a non-sharefile to exercise another code path
1596         fn = os.path.join(ss.sharedir,
1597                           storage_index_to_dir(immutable_si_0),
1598                           "not-a-share")
1599         f = open(fn, "wb")
1600         f.write("I am not a share.\n")
1601         f.close()
1602
1603         # this is before the crawl has started, so we're not in a cycle yet
1604         initial_state = lc.get_state()
1605         self.failIf(lc.get_progress()["cycle-in-progress"])
1606         self.failIf("cycle-to-date" in initial_state)
1607         self.failIf("estimated-remaining-cycle" in initial_state)
1608         self.failIf("estimated-current-cycle" in initial_state)
1609         self.failUnless("history" in initial_state)
1610         self.failUnlessEqual(initial_state["history"], {})
1611
1612         ss.setServiceParent(self.s)
1613
1614         DAY = 24*60*60
1615
1616         d = eventual.fireEventually()
1617
1618         # now examine the state right after the first bucket has been
1619         # processed.
1620         def _after_first_bucket(ignored):
1621             initial_state = lc.get_state()
1622             self.failUnless("cycle-to-date" in initial_state)
1623             self.failUnless("estimated-remaining-cycle" in initial_state)
1624             self.failUnless("estimated-current-cycle" in initial_state)
1625             self.failUnless("history" in initial_state)
1626             self.failUnlessEqual(initial_state["history"], {})
1627
1628             so_far = initial_state["cycle-to-date"]
1629             self.failUnlessEqual(so_far["expiration-enabled"], False)
1630             self.failUnless("configured-expiration-mode" in so_far)
1631             self.failUnless("lease-age-histogram" in so_far)
1632             lah = so_far["lease-age-histogram"]
1633             self.failUnlessEqual(type(lah), list)
1634             self.failUnlessEqual(len(lah), 1)
1635             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1636             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1637             self.failUnlessEqual(so_far["buckets-examined"], 1)
1638             self.failUnlessEqual(so_far["shares-examined"], 1)
1639             self.failUnlessEqual(so_far["corrupt-shares"], [])
1640             sr1 = so_far["space-recovered"]
1641             self.failUnlessEqual(sr1["actual-numshares"], 0)
1642             self.failUnlessEqual(sr1["configured-leasetimer-diskbytes"], 0)
1643             self.failUnlessEqual(sr1["original-leasetimer-sharebytes"], 0)
1644             left = initial_state["estimated-remaining-cycle"]
1645             self.failUnless(left["buckets-examined"] > 0,
1646                             left["buckets-examined"])
1647             self.failUnless(left["shares-examined"] > 0,
1648                             left["shares-examined"])
1649             sr2 = left["space-recovered"]
1650             self.failIfEqual(sr2["actual-numshares"], None)
1651             self.failIfEqual(sr2["configured-leasetimer-diskbytes"], None)
1652             self.failIfEqual(sr2["original-leasetimer-sharebytes"], None)
1653         d.addCallback(_after_first_bucket)
1654         d.addCallback(lambda ign: self.render1(webstatus))
1655         def _check_html_in_cycle(html):
1656             s = remove_tags(html)
1657             self.failUnlessIn("So far, this cycle has examined "
1658                               "1 shares in 1 buckets "
1659                               "and has recovered: "
1660                               "0 shares, 0 buckets, 0 B ", s)
1661             self.failUnlessIn("If expiration were enabled, "
1662                               "we would have recovered: "
1663                               "0 shares, 0 buckets, 0 B by now", s)
1664             self.failUnlessIn("and the remainder of this cycle "
1665                               "would probably recover: "
1666                               "0 shares, 0 buckets, 0 B ", s)
1667             self.failUnlessIn("and the whole cycle would probably recover: "
1668                               "0 shares, 0 buckets, 0 B ", s)
1669             self.failUnlessIn("if we were using each lease's default "
1670                               "31-day lease lifetime", s)
1671             self.failUnlessIn("this cycle would be expected to recover: ", s)
1672         d.addCallback(_check_html_in_cycle)
1673
1674         # wait for the crawler to finish the first cycle. Nothing should have
1675         # been removed.
1676         def _wait():
1677             return bool(lc.get_state()["last-cycle-finished"] is not None)
1678         d.addCallback(lambda ign: self.poll(_wait))
1679
1680         def _after_first_cycle(ignored):
1681             s = lc.get_state()
1682             self.failIf("cycle-to-date" in s)
1683             self.failIf("estimated-remaining-cycle" in s)
1684             self.failIf("estimated-current-cycle" in s)
1685             last = s["history"][0]
1686             self.failUnless("cycle-start-finish-times" in last)
1687             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1688             self.failUnlessEqual(last["expiration-enabled"], False)
1689             self.failUnless("configured-expiration-mode" in last)
1690
1691             self.failUnless("lease-age-histogram" in last)
1692             lah = last["lease-age-histogram"]
1693             self.failUnlessEqual(type(lah), list)
1694             self.failUnlessEqual(len(lah), 1)
1695             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1696
1697             self.failUnlessEqual(last["leases-per-share-histogram"],
1698                                  {1: 2, 2: 2})
1699             self.failUnlessEqual(last["buckets-examined"], 4)
1700             self.failUnlessEqual(last["shares-examined"], 4)
1701             self.failUnlessEqual(last["corrupt-shares"], [])
1702
1703             rec = last["space-recovered"]
1704             self.failUnlessEqual(rec["actual-numbuckets"], 0)
1705             self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 0)
1706             self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 0)
1707             self.failUnlessEqual(rec["actual-numshares"], 0)
1708             self.failUnlessEqual(rec["original-leasetimer-numshares"], 0)
1709             self.failUnlessEqual(rec["configured-leasetimer-numshares"], 0)
1710             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1711             self.failUnlessEqual(rec["original-leasetimer-diskbytes"], 0)
1712             self.failUnlessEqual(rec["configured-leasetimer-diskbytes"], 0)
1713             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1714             self.failUnlessEqual(rec["original-leasetimer-sharebytes"], 0)
1715             self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], 0)
1716
1717             def _get_sharefile(si):
1718                 return list(ss._iter_share_files(si))[0]
1719             def count_leases(si):
1720                 return len(list(_get_sharefile(si).get_leases()))
1721             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1722             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1723             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1724             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1725         d.addCallback(_after_first_cycle)
1726         d.addCallback(lambda ign: self.render1(webstatus))
1727         def _check_html(html):
1728             s = remove_tags(html)
1729             self.failUnlessIn("recovered: 0 shares, 0 buckets, 0 B "
1730                               "but expiration was not enabled", s)
1731         d.addCallback(_check_html)
1732         return d
1733
1734     def backdate_lease(self, sf, renew_secret, new_expire_time):
1735         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1736         # "renew" a lease with a new_expire_time that is older than what the
1737         # current lease has), so we have to reach inside it.
1738         for i,lease in enumerate(sf.get_leases()):
1739             if lease.renew_secret == renew_secret:
1740                 lease.expiration_time = new_expire_time
1741                 f = open(sf.home, 'rb+')
1742                 sf._write_lease_record(f, i, lease)
1743                 f.close()
1744                 return
1745         raise IndexError("unable to renew non-existent lease")
1746
1747     def test_expire_age(self):
1748         basedir = "storage/LeaseCrawler/expire_age"
1749         fileutil.make_dirs(basedir)
1750         # setting expiration_time to 2000 means that any lease which is more
1751         # than 2000s old will be expired.
1752         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1753                                        expiration_enabled=True,
1754                                        expiration_mode=("age",2000))
1755         # make it start sooner than usual.
1756         lc = ss.lease_checker
1757         lc.slow_start = 0
1758         lc.stop_after_first_bucket = True
1759         webstatus = StorageStatus(ss)
1760
1761         # create a few shares, with some leases on them
1762         self.make_shares(ss)
1763         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1764
1765         def count_shares(si):
1766             return len(list(ss._iter_share_files(si)))
1767         def _get_sharefile(si):
1768             return list(ss._iter_share_files(si))[0]
1769         def count_leases(si):
1770             return len(list(_get_sharefile(si).get_leases()))
1771
1772         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1773         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1774         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1775         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1776         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1777         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1778         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1779         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1780
1781         # artificially crank back the expiration time on the first lease of
1782         # each share, to make it look like it expired already (age=1000s).
1783         # Some shares have an extra lease which is set to expire at the
1784         # default time in 31 days from now (age=31days). We then run the
1785         # crawler, which will expire the first lease, making some shares get
1786         # deleted and others stay alive (with one remaining lease)
1787         now = time.time()
1788
1789         sf0 = _get_sharefile(immutable_si_0)
1790         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1791         sf0_size = os.stat(sf0.home).st_size
1792
1793         # immutable_si_1 gets an extra lease
1794         sf1 = _get_sharefile(immutable_si_1)
1795         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1796
1797         sf2 = _get_sharefile(mutable_si_2)
1798         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1799         sf2_size = os.stat(sf2.home).st_size
1800
1801         # mutable_si_3 gets an extra lease
1802         sf3 = _get_sharefile(mutable_si_3)
1803         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1804
1805         ss.setServiceParent(self.s)
1806
1807         d = eventual.fireEventually()
1808         # examine the state right after the first bucket has been processed
1809         def _after_first_bucket(ignored):
1810             p = lc.get_progress()
1811             self.failUnless(p["cycle-in-progress"])
1812         d.addCallback(_after_first_bucket)
1813         d.addCallback(lambda ign: self.render1(webstatus))
1814         def _check_html_in_cycle(html):
1815             s = remove_tags(html)
1816             # the first bucket encountered gets deleted, and its prefix
1817             # happens to be about 1/5th of the way through the ring, so the
1818             # predictor thinks we'll have 5 shares and that we'll delete them
1819             # all. This part of the test depends upon the SIs landing right
1820             # where they do now.
1821             self.failUnlessIn("The remainder of this cycle is expected to "
1822                               "recover: 4 shares, 4 buckets", s)
1823             self.failUnlessIn("The whole cycle is expected to examine "
1824                               "5 shares in 5 buckets and to recover: "
1825                               "5 shares, 5 buckets", s)
1826         d.addCallback(_check_html_in_cycle)
1827
1828         # wait for the crawler to finish the first cycle. Two shares should
1829         # have been removed
1830         def _wait():
1831             return bool(lc.get_state()["last-cycle-finished"] is not None)
1832         d.addCallback(lambda ign: self.poll(_wait))
1833
1834         def _after_first_cycle(ignored):
1835             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1836             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1837             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1838             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1839             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1840             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1841
1842             s = lc.get_state()
1843             last = s["history"][0]
1844
1845             self.failUnlessEqual(last["expiration-enabled"], True)
1846             self.failUnlessEqual(last["configured-expiration-mode"],
1847                                  ("age",2000))
1848             self.failUnlessEqual(last["buckets-examined"], 4)
1849             self.failUnlessEqual(last["shares-examined"], 4)
1850             self.failUnlessEqual(last["leases-per-share-histogram"],
1851                                  {1: 2, 2: 2})
1852
1853             rec = last["space-recovered"]
1854             self.failUnlessEqual(rec["actual-numbuckets"], 2)
1855             self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 2)
1856             self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 2)
1857             self.failUnlessEqual(rec["actual-numshares"], 2)
1858             self.failUnlessEqual(rec["original-leasetimer-numshares"], 2)
1859             self.failUnlessEqual(rec["configured-leasetimer-numshares"], 2)
1860             size = sf0_size + sf2_size
1861             self.failUnlessEqual(rec["actual-sharebytes"], size)
1862             self.failUnlessEqual(rec["original-leasetimer-sharebytes"], size)
1863             self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], size)
1864             # different platforms have different notions of "blocks used by
1865             # this file", so merely assert that it's a number
1866             self.failUnless(rec["actual-diskbytes"] >= 0,
1867                             rec["actual-diskbytes"])
1868             self.failUnless(rec["original-leasetimer-diskbytes"] >= 0,
1869                             rec["original-leasetimer-diskbytes"])
1870             self.failUnless(rec["configured-leasetimer-diskbytes"] >= 0,
1871                             rec["configured-leasetimer-diskbytes"])
1872         d.addCallback(_after_first_cycle)
1873         d.addCallback(lambda ign: self.render1(webstatus))
1874         def _check_html(html):
1875             s = remove_tags(html)
1876             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1877             self.failUnlessIn("leases created or last renewed more than 33 minutes ago will be considered expired", s)
1878             self.failUnlessIn(" recovered: 2 shares, 2 buckets, ", s)
1879         d.addCallback(_check_html)
1880         return d
1881
1882     def test_expire_date_cutoff(self):
1883         basedir = "storage/LeaseCrawler/expire_date_cutoff"
1884         fileutil.make_dirs(basedir)
1885         # setting date-cutoff to 2000 seconds ago means that any lease which
1886         # is more than 2000s old will be expired.
1887         now = time.time()
1888         then = int(now - 2000)
1889         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1890                                        expiration_enabled=True,
1891                                        expiration_mode=("date-cutoff",then))
1892         # make it start sooner than usual.
1893         lc = ss.lease_checker
1894         lc.slow_start = 0
1895         lc.stop_after_first_bucket = True
1896         webstatus = StorageStatus(ss)
1897
1898         # create a few shares, with some leases on them
1899         self.make_shares(ss)
1900         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1901
1902         def count_shares(si):
1903             return len(list(ss._iter_share_files(si)))
1904         def _get_sharefile(si):
1905             return list(ss._iter_share_files(si))[0]
1906         def count_leases(si):
1907             return len(list(_get_sharefile(si).get_leases()))
1908
1909         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1910         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1911         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1912         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1913         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1914         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1915         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1916         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1917
1918         # artificially crank back the expiration time on the first lease of
1919         # each share, to make it look like was renewed 3000s ago. To achieve
1920         # this, we need to set the expiration time to now-3000+31days. This
1921         # will change when the lease format is improved to contain both
1922         # create/renew time and duration.
1923         new_expiration_time = now - 3000 + 31*24*60*60
1924
1925         # Some shares have an extra lease which is set to expire at the
1926         # default time in 31 days from now (age=31days). We then run the
1927         # crawler, which will expire the first lease, making some shares get
1928         # deleted and others stay alive (with one remaining lease)
1929
1930         sf0 = _get_sharefile(immutable_si_0)
1931         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1932         sf0_size = os.stat(sf0.home).st_size
1933
1934         # immutable_si_1 gets an extra lease
1935         sf1 = _get_sharefile(immutable_si_1)
1936         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1937
1938         sf2 = _get_sharefile(mutable_si_2)
1939         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1940         sf2_size = os.stat(sf2.home).st_size
1941
1942         # mutable_si_3 gets an extra lease
1943         sf3 = _get_sharefile(mutable_si_3)
1944         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1945
1946         ss.setServiceParent(self.s)
1947
1948         d = eventual.fireEventually()
1949         # examine the state right after the first bucket has been processed
1950         def _after_first_bucket(ignored):
1951             p = lc.get_progress()
1952             self.failUnless(p["cycle-in-progress"])
1953         d.addCallback(_after_first_bucket)
1954         d.addCallback(lambda ign: self.render1(webstatus))
1955         def _check_html_in_cycle(html):
1956             s = remove_tags(html)
1957             # the first bucket encountered gets deleted, and its prefix
1958             # happens to be about 1/5th of the way through the ring, so the
1959             # predictor thinks we'll have 5 shares and that we'll delete them
1960             # all. This part of the test depends upon the SIs landing right
1961             # where they do now.
1962             self.failUnlessIn("The remainder of this cycle is expected to "
1963                               "recover: 4 shares, 4 buckets", s)
1964             self.failUnlessIn("The whole cycle is expected to examine "
1965                               "5 shares in 5 buckets and to recover: "
1966                               "5 shares, 5 buckets", s)
1967         d.addCallback(_check_html_in_cycle)
1968
1969         # wait for the crawler to finish the first cycle. Two shares should
1970         # have been removed
1971         def _wait():
1972             return bool(lc.get_state()["last-cycle-finished"] is not None)
1973         d.addCallback(lambda ign: self.poll(_wait))
1974
1975         def _after_first_cycle(ignored):
1976             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1977             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1978             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1979             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1980             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1981             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1982
1983             s = lc.get_state()
1984             last = s["history"][0]
1985
1986             self.failUnlessEqual(last["expiration-enabled"], True)
1987             self.failUnlessEqual(last["configured-expiration-mode"],
1988                                  ("date-cutoff",then))
1989             self.failUnlessEqual(last["buckets-examined"], 4)
1990             self.failUnlessEqual(last["shares-examined"], 4)
1991             self.failUnlessEqual(last["leases-per-share-histogram"],
1992                                  {1: 2, 2: 2})
1993
1994             rec = last["space-recovered"]
1995             self.failUnlessEqual(rec["actual-numbuckets"], 2)
1996             self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 0)
1997             self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 2)
1998             self.failUnlessEqual(rec["actual-numshares"], 2)
1999             self.failUnlessEqual(rec["original-leasetimer-numshares"], 0)
2000             self.failUnlessEqual(rec["configured-leasetimer-numshares"], 2)
2001             size = sf0_size + sf2_size
2002             self.failUnlessEqual(rec["actual-sharebytes"], size)
2003             self.failUnlessEqual(rec["original-leasetimer-sharebytes"], 0)
2004             self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], size)
2005             # different platforms have different notions of "blocks used by
2006             # this file", so merely assert that it's a number
2007             self.failUnless(rec["actual-diskbytes"] >= 0,
2008                             rec["actual-diskbytes"])
2009             self.failUnless(rec["original-leasetimer-diskbytes"] >= 0,
2010                             rec["original-leasetimer-diskbytes"])
2011             self.failUnless(rec["configured-leasetimer-diskbytes"] >= 0,
2012                             rec["configured-leasetimer-diskbytes"])
2013         d.addCallback(_after_first_cycle)
2014         d.addCallback(lambda ign: self.render1(webstatus))
2015         def _check_html(html):
2016             s = remove_tags(html)
2017             self.failUnlessIn("Expiration Enabled:"
2018                               " expired leases will be removed", s)
2019             date = time.strftime("%d-%b-%Y", time.gmtime(then))
2020             self.failUnlessIn("leases created or last renewed before %s"
2021                               " will be considered expired" % date, s)
2022             self.failUnlessIn(" recovered: 2 shares, 2 buckets, ", s)
2023         d.addCallback(_check_html)
2024         return d
2025
2026     def test_only_immutable(self):
2027         basedir = "storage/LeaseCrawler/only_immutable"
2028         fileutil.make_dirs(basedir)
2029         now = time.time()
2030         then = int(now - 2000)
2031         ss = StorageServer(basedir, "\x00" * 20,
2032                            expiration_enabled=True,
2033                            expiration_mode=("date-cutoff",
2034                                             then, ("immutable",)))
2035         lc = ss.lease_checker
2036         lc.slow_start = 0
2037         webstatus = StorageStatus(ss)
2038
2039         self.make_shares(ss)
2040         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2041         # set all leases to be expirable
2042         new_expiration_time = now - 3000 + 31*24*60*60
2043
2044         def count_shares(si):
2045             return len(list(ss._iter_share_files(si)))
2046         def _get_sharefile(si):
2047             return list(ss._iter_share_files(si))[0]
2048         def count_leases(si):
2049             return len(list(_get_sharefile(si).get_leases()))
2050
2051         sf0 = _get_sharefile(immutable_si_0)
2052         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2053         sf1 = _get_sharefile(immutable_si_1)
2054         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2055         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2056         sf2 = _get_sharefile(mutable_si_2)
2057         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2058         sf3 = _get_sharefile(mutable_si_3)
2059         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2060         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2061
2062         ss.setServiceParent(self.s)
2063         def _wait():
2064             return bool(lc.get_state()["last-cycle-finished"] is not None)
2065         d = self.poll(_wait)
2066
2067         def _after_first_cycle(ignored):
2068             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2069             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2070             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2071             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2072             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2073             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2074         d.addCallback(_after_first_cycle)
2075         d.addCallback(lambda ign: self.render1(webstatus))
2076         def _check_html(html):
2077             s = remove_tags(html)
2078             self.failUnlessIn("only the following sharetypes will be expired: immutable Next crawl", s)
2079         d.addCallback(_check_html)
2080         return d
2081
2082     def test_only_mutable(self):
2083         basedir = "storage/LeaseCrawler/only_mutable"
2084         fileutil.make_dirs(basedir)
2085         now = time.time()
2086         then = int(now - 2000)
2087         ss = StorageServer(basedir, "\x00" * 20,
2088                            expiration_enabled=True,
2089                            expiration_mode=("date-cutoff",
2090                                             then, ("mutable",)))
2091         lc = ss.lease_checker
2092         lc.slow_start = 0
2093         webstatus = StorageStatus(ss)
2094
2095         self.make_shares(ss)
2096         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2097         # set all leases to be expirable
2098         new_expiration_time = now - 3000 + 31*24*60*60
2099
2100         def count_shares(si):
2101             return len(list(ss._iter_share_files(si)))
2102         def _get_sharefile(si):
2103             return list(ss._iter_share_files(si))[0]
2104         def count_leases(si):
2105             return len(list(_get_sharefile(si).get_leases()))
2106
2107         sf0 = _get_sharefile(immutable_si_0)
2108         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2109         sf1 = _get_sharefile(immutable_si_1)
2110         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2111         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2112         sf2 = _get_sharefile(mutable_si_2)
2113         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2114         sf3 = _get_sharefile(mutable_si_3)
2115         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2116         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2117
2118         ss.setServiceParent(self.s)
2119         def _wait():
2120             return bool(lc.get_state()["last-cycle-finished"] is not None)
2121         d = self.poll(_wait)
2122
2123         def _after_first_cycle(ignored):
2124             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2125             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2126             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2127             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2128             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2129             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2130         d.addCallback(_after_first_cycle)
2131         d.addCallback(lambda ign: self.render1(webstatus))
2132         def _check_html(html):
2133             s = remove_tags(html)
2134             self.failUnlessIn("only the following sharetypes will be expired: mutable Next crawl", s)
2135         d.addCallback(_check_html)
2136         return d
2137
2138     def test_bad_mode(self):
2139         basedir = "storage/LeaseCrawler/bad_mode"
2140         fileutil.make_dirs(basedir)
2141         e = self.failUnlessRaises(ValueError,
2142                                   StorageServer, basedir, "\x00" * 20,
2143                                   expiration_mode=("bogus", 0))
2144         self.failUnless("garbage-collection mode 'bogus'"
2145                         " must be 'age' or 'date-cutoff'" in str(e), str(e))
2146
2147     def test_parse_duration(self):
2148         DAY = 24*60*60
2149         MONTH = 31*DAY
2150         YEAR = 365*DAY
2151         p = time_format.parse_duration
2152         self.failUnlessEqual(p("7days"), 7*DAY)
2153         self.failUnlessEqual(p("31day"), 31*DAY)
2154         self.failUnlessEqual(p("60 days"), 60*DAY)
2155         self.failUnlessEqual(p("2mo"), 2*MONTH)
2156         self.failUnlessEqual(p("3 month"), 3*MONTH)
2157         self.failUnlessEqual(p("2years"), 2*YEAR)
2158         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2159         self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2160                         in str(e), str(e))
2161
2162     def test_limited_history(self):
2163         basedir = "storage/LeaseCrawler/limited_history"
2164         fileutil.make_dirs(basedir)
2165         ss = StorageServer(basedir, "\x00" * 20)
2166         # make it start sooner than usual.
2167         lc = ss.lease_checker
2168         lc.slow_start = 0
2169         lc.cpu_slice = 500
2170
2171         # create a few shares, with some leases on them
2172         self.make_shares(ss)
2173
2174         ss.setServiceParent(self.s)
2175
2176         def _wait_until_15_cycles_done():
2177             last = lc.state["last-cycle-finished"]
2178             if last is not None and last >= 15:
2179                 return True
2180             if lc.timer:
2181                 lc.timer.reset(0)
2182             return False
2183         d = self.poll(_wait_until_15_cycles_done)
2184
2185         def _check(ignored):
2186             s = lc.get_state()
2187             h = s["history"]
2188             self.failUnlessEqual(len(h), 10)
2189             self.failUnlessEqual(max(h.keys()), 15)
2190             self.failUnlessEqual(min(h.keys()), 6)
2191         d.addCallback(_check)
2192         return d
2193
2194     def test_unpredictable_future(self):
2195         basedir = "storage/LeaseCrawler/unpredictable_future"
2196         fileutil.make_dirs(basedir)
2197         ss = StorageServer(basedir, "\x00" * 20)
2198         # make it start sooner than usual.
2199         lc = ss.lease_checker
2200         lc.slow_start = 0
2201         lc.cpu_slice = -1.0 # stop quickly
2202
2203         self.make_shares(ss)
2204
2205         ss.setServiceParent(self.s)
2206
2207         d = eventual.fireEventually()
2208         def _check(ignored):
2209             # this should fire after the first bucket is complete, but before
2210             # the first prefix is complete, so the progress-measurer won't
2211             # think we've gotten far enough to raise our percent-complete
2212             # above 0%, triggering the cannot-predict-the-future code in
2213             # expirer.py . This will have to change if/when the
2214             # progress-measurer gets smart enough to count buckets (we'll
2215             # have to interrupt it even earlier, before it's finished the
2216             # first bucket).
2217             s = lc.get_state()
2218             self.failUnless("cycle-to-date" in s)
2219             self.failUnless("estimated-remaining-cycle" in s)
2220             self.failUnless("estimated-current-cycle" in s)
2221
2222             left = s["estimated-remaining-cycle"]["space-recovered"]
2223             self.failUnlessEqual(left["actual-numbuckets"], None)
2224             self.failUnlessEqual(left["original-leasetimer-numbuckets"], None)
2225             self.failUnlessEqual(left["configured-leasetimer-numbuckets"], None)
2226             self.failUnlessEqual(left["actual-numshares"], None)
2227             self.failUnlessEqual(left["original-leasetimer-numshares"], None)
2228             self.failUnlessEqual(left["configured-leasetimer-numshares"], None)
2229             self.failUnlessEqual(left["actual-diskbytes"], None)
2230             self.failUnlessEqual(left["original-leasetimer-diskbytes"], None)
2231             self.failUnlessEqual(left["configured-leasetimer-diskbytes"], None)
2232             self.failUnlessEqual(left["actual-sharebytes"], None)
2233             self.failUnlessEqual(left["original-leasetimer-sharebytes"], None)
2234             self.failUnlessEqual(left["configured-leasetimer-sharebytes"], None)
2235
2236             full = s["estimated-remaining-cycle"]["space-recovered"]
2237             self.failUnlessEqual(full["actual-numbuckets"], None)
2238             self.failUnlessEqual(full["original-leasetimer-numbuckets"], None)
2239             self.failUnlessEqual(full["configured-leasetimer-numbuckets"], None)
2240             self.failUnlessEqual(full["actual-numshares"], None)
2241             self.failUnlessEqual(full["original-leasetimer-numshares"], None)
2242             self.failUnlessEqual(full["configured-leasetimer-numshares"], None)
2243             self.failUnlessEqual(full["actual-diskbytes"], None)
2244             self.failUnlessEqual(full["original-leasetimer-diskbytes"], None)
2245             self.failUnlessEqual(full["configured-leasetimer-diskbytes"], None)
2246             self.failUnlessEqual(full["actual-sharebytes"], None)
2247             self.failUnlessEqual(full["original-leasetimer-sharebytes"], None)
2248             self.failUnlessEqual(full["configured-leasetimer-sharebytes"], None)
2249
2250         d.addCallback(_check)
2251         return d
2252
2253     def test_no_st_blocks(self):
2254         basedir = "storage/LeaseCrawler/no_st_blocks"
2255         fileutil.make_dirs(basedir)
2256         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2257                                         expiration_mode=("age",-1000))
2258         # a negative expiration_time= means the "configured-leasetimer-"
2259         # space-recovered counts will be non-zero, since all shares will have
2260         # expired by then
2261
2262         # make it start sooner than usual.
2263         lc = ss.lease_checker
2264         lc.slow_start = 0
2265
2266         self.make_shares(ss)
2267         ss.setServiceParent(self.s)
2268         def _wait():
2269             return bool(lc.get_state()["last-cycle-finished"] is not None)
2270         d = self.poll(_wait)
2271
2272         def _check(ignored):
2273             s = lc.get_state()
2274             last = s["history"][0]
2275             rec = last["space-recovered"]
2276             self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 4)
2277             self.failUnlessEqual(rec["configured-leasetimer-numshares"], 4)
2278             self.failUnless(rec["configured-leasetimer-sharebytes"] > 0,
2279                             rec["configured-leasetimer-sharebytes"])
2280             # without the .st_blocks field in os.stat() results, we should be
2281             # reporting diskbytes==sharebytes
2282             self.failUnlessEqual(rec["configured-leasetimer-sharebytes"],
2283                                  rec["configured-leasetimer-diskbytes"])
2284         d.addCallback(_check)
2285         return d
2286
2287     def test_share_corruption(self):
2288         basedir = "storage/LeaseCrawler/share_corruption"
2289         fileutil.make_dirs(basedir)
2290         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2291         w = StorageStatus(ss)
2292         # make it start sooner than usual.
2293         lc = ss.lease_checker
2294         lc.stop_after_first_bucket = True
2295         lc.slow_start = 0
2296         lc.cpu_slice = 500
2297
2298         # create a few shares, with some leases on them
2299         self.make_shares(ss)
2300
2301         # now corrupt one, and make sure the lease-checker keeps going
2302         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2303         first = min(self.sis)
2304         first_b32 = base32.b2a(first)
2305         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2306         f = open(fn, "rb+")
2307         f.seek(0)
2308         f.write("BAD MAGIC")
2309         f.close()
2310         # if get_share_file() doesn't see the correct mutable magic, it
2311         # assumes the file is an immutable share, and then
2312         # immutable.ShareFile sees a bad version. So regardless of which kind
2313         # of share we corrupted, this will trigger an
2314         # UnknownImmutableContainerVersionError.
2315
2316         ss.setServiceParent(self.s)
2317
2318         d = eventual.fireEventually()
2319
2320         # now examine the state right after the first bucket has been
2321         # processed.
2322         def _after_first_bucket(ignored):
2323             so_far = lc.get_state()["cycle-to-date"]
2324             self.failUnlessEqual(so_far["buckets-examined"], 1)
2325             self.failUnlessEqual(so_far["shares-examined"], 0)
2326             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2327         d.addCallback(_after_first_bucket)
2328
2329         d.addCallback(lambda ign: self.render_json(w))
2330         def _check_json(json):
2331             data = simplejson.loads(json)
2332             # grr. json turns all dict keys into strings.
2333             so_far = data["lease-checker"]["cycle-to-date"]
2334             corrupt_shares = so_far["corrupt-shares"]
2335             # it also turns all tuples into lists
2336             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2337         d.addCallback(_check_json)
2338         d.addCallback(lambda ign: self.render1(w))
2339         def _check_html(html):
2340             s = remove_tags(html)
2341             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2342         d.addCallback(_check_html)
2343
2344         def _wait():
2345             return bool(lc.get_state()["last-cycle-finished"] is not None)
2346         d.addCallback(lambda ign: self.poll(_wait))
2347
2348         def _after_first_cycle(ignored):
2349             s = lc.get_state()
2350             last = s["history"][0]
2351             self.failUnlessEqual(last["buckets-examined"], 4)
2352             self.failUnlessEqual(last["shares-examined"], 3)
2353             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2354         d.addCallback(_after_first_cycle)
2355         d.addCallback(lambda ign: self.render_json(w))
2356         def _check_json_history(json):
2357             data = simplejson.loads(json)
2358             last = data["lease-checker"]["history"]["0"]
2359             corrupt_shares = last["corrupt-shares"]
2360             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2361         d.addCallback(_check_json_history)
2362         d.addCallback(lambda ign: self.render1(w))
2363         def _check_html_history(html):
2364             s = remove_tags(html)
2365             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2366         d.addCallback(_check_html_history)
2367
2368         def _cleanup(res):
2369             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2370                                    UnknownImmutableContainerVersionError)
2371             return res
2372         d.addBoth(_cleanup)
2373         return d
2374
2375     def render_json(self, page):
2376         d = self.render1(page, args={"t": ["json"]})
2377         return d
2378
2379 class NoStatvfsServer(StorageServer):
2380     def do_statvfs(self):
2381         raise AttributeError
2382
2383 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2384
2385     def setUp(self):
2386         self.s = service.MultiService()
2387         self.s.startService()
2388     def tearDown(self):
2389         return self.s.stopService()
2390
2391     def test_no_server(self):
2392         w = StorageStatus(None)
2393         html = w.renderSynchronously()
2394         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2395
2396     def test_status(self):
2397         basedir = "storage/WebStatus/status"
2398         fileutil.make_dirs(basedir)
2399         ss = StorageServer(basedir, "\x00" * 20)
2400         ss.setServiceParent(self.s)
2401         w = StorageStatus(ss)
2402         d = self.render1(w)
2403         def _check_html(html):
2404             self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2405             s = remove_tags(html)
2406             self.failUnless("Accepting new shares: Yes" in s, s)
2407             self.failUnless("Reserved space: - 0 B (0)" in s, s)
2408         d.addCallback(_check_html)
2409         d.addCallback(lambda ign: self.render_json(w))
2410         def _check_json(json):
2411             data = simplejson.loads(json)
2412             s = data["stats"]
2413             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2414             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2415             self.failUnless("bucket-counter" in data)
2416             self.failUnless("lease-checker" in data)
2417         d.addCallback(_check_json)
2418         return d
2419
2420     def render_json(self, page):
2421         d = self.render1(page, args={"t": ["json"]})
2422         return d
2423
2424     def test_status_no_statvfs(self):
2425         # windows has no os.statvfs . Make sure the code handles that even on
2426         # unix.
2427         basedir = "storage/WebStatus/status_no_statvfs"
2428         fileutil.make_dirs(basedir)
2429         ss = NoStatvfsServer(basedir, "\x00" * 20)
2430         ss.setServiceParent(self.s)
2431         w = StorageStatus(ss)
2432         html = w.renderSynchronously()
2433         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2434         s = remove_tags(html)
2435         self.failUnless("Accepting new shares: Yes" in s, s)
2436         self.failUnless("Total disk space: ?" in s, s)
2437
2438     def test_readonly(self):
2439         basedir = "storage/WebStatus/readonly"
2440         fileutil.make_dirs(basedir)
2441         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2442         ss.setServiceParent(self.s)
2443         w = StorageStatus(ss)
2444         html = w.renderSynchronously()
2445         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2446         s = remove_tags(html)
2447         self.failUnless("Accepting new shares: No" in s, s)
2448
2449     def test_reserved(self):
2450         basedir = "storage/WebStatus/reserved"
2451         fileutil.make_dirs(basedir)
2452         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2453         ss.setServiceParent(self.s)
2454         w = StorageStatus(ss)
2455         html = w.renderSynchronously()
2456         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2457         s = remove_tags(html)
2458         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2459
2460     def test_huge_reserved(self):
2461         basedir = "storage/WebStatus/reserved"
2462         fileutil.make_dirs(basedir)
2463         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2464         ss.setServiceParent(self.s)
2465         w = StorageStatus(ss)
2466         html = w.renderSynchronously()
2467         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2468         s = remove_tags(html)
2469         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2470
2471     def test_util(self):
2472         w = StorageStatus(None)
2473         self.failUnlessEqual(w.render_space(None, None), "?")
2474         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2475         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2476         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2477         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2478         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
2479