]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
expirer: tolerate empty buckets, refactor bucketsize-counting code a bit, don't incre...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re, simplejson, struct
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
26
27 class Marker:
28     pass
29 class FakeCanary:
30     def __init__(self, ignore_disconnectors=False):
31         self.ignore = ignore_disconnectors
32         self.disconnectors = {}
33     def notifyOnDisconnect(self, f, *args, **kwargs):
34         if self.ignore:
35             return
36         m = Marker()
37         self.disconnectors[m] = (f, args, kwargs)
38         return m
39     def dontNotifyOnDisconnect(self, marker):
40         if self.ignore:
41             return
42         del self.disconnectors[marker]
43
44 class FakeStatsProvider:
45     def count(self, name, delta=1):
46         pass
47     def register_producer(self, producer):
48         pass
49
50 class Bucket(unittest.TestCase):
51     def make_workdir(self, name):
52         basedir = os.path.join("storage", "Bucket", name)
53         incoming = os.path.join(basedir, "tmp", "bucket")
54         final = os.path.join(basedir, "bucket")
55         fileutil.make_dirs(basedir)
56         fileutil.make_dirs(os.path.join(basedir, "tmp"))
57         return incoming, final
58
59     def bucket_writer_closed(self, bw, consumed):
60         pass
61     def add_latency(self, category, latency):
62         pass
63     def count(self, name, delta=1):
64         pass
65
66     def make_lease(self):
67         owner_num = 0
68         renew_secret = os.urandom(32)
69         cancel_secret = os.urandom(32)
70         expiration_time = time.time() + 5000
71         return LeaseInfo(owner_num, renew_secret, cancel_secret,
72                          expiration_time, "\x00" * 20)
73
74     def test_create(self):
75         incoming, final = self.make_workdir("test_create")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*25)
81         bw.remote_write(75, "d"*7)
82         bw.remote_close()
83
84     def test_readwrite(self):
85         incoming, final = self.make_workdir("test_readwrite")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*7) # last block may be short
91         bw.remote_close()
92
93         # now read from it
94         br = BucketReader(self, bw.finalhome)
95         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
98
99 class RemoteBucket:
100
101     def callRemote(self, methname, *args, **kwargs):
102         def _call():
103             meth = getattr(self.target, "remote_" + methname)
104             return meth(*args, **kwargs)
105         return defer.maybeDeferred(_call)
106
107 class BucketProxy(unittest.TestCase):
108     def make_bucket(self, name, size):
109         basedir = os.path.join("storage", "BucketProxy", name)
110         incoming = os.path.join(basedir, "tmp", "bucket")
111         final = os.path.join(basedir, "bucket")
112         fileutil.make_dirs(basedir)
113         fileutil.make_dirs(os.path.join(basedir, "tmp"))
114         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
115                           FakeCanary())
116         rb = RemoteBucket()
117         rb.target = bw
118         return bw, rb, final
119
120     def make_lease(self):
121         owner_num = 0
122         renew_secret = os.urandom(32)
123         cancel_secret = os.urandom(32)
124         expiration_time = time.time() + 5000
125         return LeaseInfo(owner_num, renew_secret, cancel_secret,
126                          expiration_time, "\x00" * 20)
127
128     def bucket_writer_closed(self, bw, consumed):
129         pass
130     def add_latency(self, category, latency):
131         pass
132     def count(self, name, delta=1):
133         pass
134
135     def test_create(self):
136         bw, rb, sharefname = self.make_bucket("test_create", 500)
137         bp = WriteBucketProxy(rb,
138                               data_size=300,
139                               block_size=10,
140                               num_segments=5,
141                               num_share_hashes=3,
142                               uri_extension_size_max=500, nodeid=None)
143         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
144
145     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146         # Let's pretend each share has 100 bytes of data, and that there are
147         # 4 segments (25 bytes each), and 8 shares total. So the two
148         # per-segment merkle trees (crypttext_hash_tree,
149         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152         # long. That should make the whole share:
153         #
154         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
156
157         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
158
159         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
160                             for i in range(7)]
161         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
162                         for i in range(7)]
163         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
164                         for i in (1,9,13)]
165         uri_extension = "s" + "E"*498 + "e"
166
167         bw, rb, sharefname = self.make_bucket(name, sharesize)
168         bp = wbp_class(rb,
169                        data_size=95,
170                        block_size=25,
171                        num_segments=4,
172                        num_share_hashes=3,
173                        uri_extension_size_max=len(uri_extension),
174                        nodeid=None)
175
176         d = bp.put_header()
177         d.addCallback(lambda res: bp.put_block(0, "a"*25))
178         d.addCallback(lambda res: bp.put_block(1, "b"*25))
179         d.addCallback(lambda res: bp.put_block(2, "c"*25))
180         d.addCallback(lambda res: bp.put_block(3, "d"*20))
181         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185         d.addCallback(lambda res: bp.close())
186
187         # now read everything back
188         def _start_reading(res):
189             br = BucketReader(self, sharefname)
190             rb = RemoteBucket()
191             rb.target = br
192             rbp = rbp_class(rb, peerid="abc", storage_index="")
193             self.failUnless("to peer" in repr(rbp))
194             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
195
196             d1 = rbp.get_block_data(0, 25, 25)
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
204
205             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, crypttext_hashes))
208             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210             d1.addCallback(lambda res: rbp.get_share_hashes())
211             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212             d1.addCallback(lambda res: rbp.get_uri_extension())
213             d1.addCallback(lambda res:
214                            self.failUnlessEqual(res, uri_extension))
215
216             return d1
217
218         d.addCallback(_start_reading)
219
220         return d
221
222     def test_readwrite_v1(self):
223         return self._do_test_readwrite("test_readwrite_v1",
224                                        0x24, WriteBucketProxy, ReadBucketProxy)
225
226     def test_readwrite_v2(self):
227         return self._do_test_readwrite("test_readwrite_v2",
228                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
229
230 class FakeDiskStorageServer(StorageServer):
231     def stat_disk(self, d):
232         return self.DISKAVAIL
233
234 class Server(unittest.TestCase):
235
236     def setUp(self):
237         self.sparent = LoggingServiceParent()
238         self.sparent.startService()
239         self._lease_secret = itertools.count()
240     def tearDown(self):
241         return self.sparent.stopService()
242
243     def workdir(self, name):
244         basedir = os.path.join("storage", "Server", name)
245         return basedir
246
247     def create(self, name, reserved_space=0, klass=StorageServer):
248         workdir = self.workdir(name)
249         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
250                    stats_provider=FakeStatsProvider())
251         ss.setServiceParent(self.sparent)
252         return ss
253
254     def test_create(self):
255         ss = self.create("test_create")
256
257     def allocate(self, ss, storage_index, sharenums, size, canary=None):
258         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260         if not canary:
261             canary = FakeCanary()
262         return ss.remote_allocate_buckets(storage_index,
263                                           renew_secret, cancel_secret,
264                                           sharenums, size, canary)
265
266     def test_large_share(self):
267         ss = self.create("test_large_share")
268
269         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
270         self.failUnlessEqual(already, set())
271         self.failUnlessEqual(set(writers.keys()), set([0]))
272
273         shnum, bucket = writers.items()[0]
274         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
275         bucket.remote_write(2**32, "ab")
276         bucket.remote_close()
277
278         readers = ss.remote_get_buckets("allocate")
279         reader = readers[shnum]
280         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
281     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
282
283     def test_dont_overfill_dirs(self):
284         """
285         This test asserts that if you add a second share whose storage index
286         share lots of leading bits with an extant share (but isn't the exact
287         same storage index), this won't add an entry to the share directory.
288         """
289         ss = self.create("test_dont_overfill_dirs")
290         already, writers = self.allocate(ss, "storageindex", [0], 10)
291         for i, wb in writers.items():
292             wb.remote_write(0, "%10d" % i)
293             wb.remote_close()
294         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
295                                 "shares")
296         children_of_storedir = set(os.listdir(storedir))
297
298         # Now store another one under another storageindex that has leading
299         # chars the same as the first storageindex.
300         already, writers = self.allocate(ss, "storageindey", [0], 10)
301         for i, wb in writers.items():
302             wb.remote_write(0, "%10d" % i)
303             wb.remote_close()
304         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
305                                 "shares")
306         new_children_of_storedir = set(os.listdir(storedir))
307         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
308
309     def test_remove_incoming(self):
310         ss = self.create("test_remove_incoming")
311         already, writers = self.allocate(ss, "vid", range(3), 10)
312         for i,wb in writers.items():
313             wb.remote_write(0, "%10d" % i)
314             wb.remote_close()
315         incoming_share_dir = wb.incominghome
316         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
317         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
318         incoming_dir = os.path.dirname(incoming_prefix_dir)
319         self.failIf(os.path.exists(incoming_bucket_dir))
320         self.failIf(os.path.exists(incoming_prefix_dir))
321         self.failUnless(os.path.exists(incoming_dir))
322
323     def test_allocate(self):
324         ss = self.create("test_allocate")
325
326         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
327
328         canary = FakeCanary()
329         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330         self.failUnlessEqual(already, set())
331         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
332
333         # while the buckets are open, they should not count as readable
334         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
335
336         # close the buckets
337         for i,wb in writers.items():
338             wb.remote_write(0, "%25d" % i)
339             wb.remote_close()
340             # aborting a bucket that was already closed is a no-op
341             wb.remote_abort()
342
343         # now they should be readable
344         b = ss.remote_get_buckets("allocate")
345         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
347         b_str = str(b[0])
348         self.failUnless("BucketReader" in b_str, b_str)
349         self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
350
351         # now if we ask about writing again, the server should offer those
352         # three buckets as already present. It should offer them even if we
353         # don't ask about those specific ones.
354         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355         self.failUnlessEqual(already, set([0,1,2]))
356         self.failUnlessEqual(set(writers.keys()), set([3,4]))
357
358         # while those two buckets are open for writing, the server should
359         # refuse to offer them to uploaders
360
361         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362         self.failUnlessEqual(already2, set([0,1,2]))
363         self.failUnlessEqual(set(writers2.keys()), set([5]))
364
365         # aborting the writes should remove the tempfiles
366         for i,wb in writers2.items():
367             wb.remote_abort()
368         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369         self.failUnlessEqual(already2, set([0,1,2]))
370         self.failUnlessEqual(set(writers2.keys()), set([5]))
371
372         for i,wb in writers2.items():
373             wb.remote_abort()
374         for i,wb in writers.items():
375             wb.remote_abort()
376
377     def test_bad_container_version(self):
378         ss = self.create("test_bad_container_version")
379         a,w = self.allocate(ss, "si1", [0], 10)
380         w[0].remote_write(0, "\xff"*10)
381         w[0].remote_close()
382
383         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
384         f = open(fn, "rb+")
385         f.seek(0)
386         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
387         f.close()
388
389         b = ss.remote_get_buckets("allocate")
390
391         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392                                   ss.remote_get_buckets, "si1")
393         self.failUnless(" had version 0 but we wanted 1" in str(e), e)
394
395     def test_disconnect(self):
396         # simulate a disconnection
397         ss = self.create("test_disconnect")
398         canary = FakeCanary()
399         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400         self.failUnlessEqual(already, set())
401         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402         for (f,args,kwargs) in canary.disconnectors.values():
403             f(*args, **kwargs)
404         del already
405         del writers
406
407         # that ought to delete the incoming shares
408         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409         self.failUnlessEqual(already, set())
410         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
411
412     def test_reserved_space(self):
413         ss = self.create("test_reserved_space", reserved_space=10000,
414                          klass=FakeDiskStorageServer)
415         # the FakeDiskStorageServer doesn't do real statvfs() calls
416         ss.DISKAVAIL = 15000
417         # 15k available, 10k reserved, leaves 5k for shares
418
419         # a newly created and filled share incurs this much overhead, beyond
420         # the size we request.
421         OVERHEAD = 3*4
422         LEASE_SIZE = 4+32+32+4
423         canary = FakeCanary(True)
424         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425         self.failUnlessEqual(len(writers), 3)
426         # now the StorageServer should have 3000 bytes provisionally
427         # allocated, allowing only 2000 more to be claimed
428         self.failUnlessEqual(len(ss._active_writers), 3)
429
430         # allocating 1001-byte shares only leaves room for one
431         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432         self.failUnlessEqual(len(writers2), 1)
433         self.failUnlessEqual(len(ss._active_writers), 4)
434
435         # we abandon the first set, so their provisional allocation should be
436         # returned
437         del already
438         del writers
439         self.failUnlessEqual(len(ss._active_writers), 1)
440         # now we have a provisional allocation of 1001 bytes
441
442         # and we close the second set, so their provisional allocation should
443         # become real, long-term allocation, and grows to include the
444         # overhead.
445         for bw in writers2.values():
446             bw.remote_write(0, "a"*25)
447             bw.remote_close()
448         del already2
449         del writers2
450         del bw
451         self.failUnlessEqual(len(ss._active_writers), 0)
452
453         allocated = 1001 + OVERHEAD + LEASE_SIZE
454
455         # we have to manually increase DISKAVAIL, since we're not doing real
456         # disk measurements
457         ss.DISKAVAIL -= allocated
458
459         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462         self.failUnlessEqual(len(writers3), 39)
463         self.failUnlessEqual(len(ss._active_writers), 39)
464
465         del already3
466         del writers3
467         self.failUnlessEqual(len(ss._active_writers), 0)
468         ss.disownServiceParent()
469         del ss
470
471     def test_seek(self):
472         basedir = self.workdir("test_seek_behavior")
473         fileutil.make_dirs(basedir)
474         filename = os.path.join(basedir, "testfile")
475         f = open(filename, "wb")
476         f.write("start")
477         f.close()
478         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
479         # files. mode="a" preserves previous contents but does not allow
480         # seeking-to-create-holes. mode="r+" allows both.
481         f = open(filename, "rb+")
482         f.seek(100)
483         f.write("100")
484         f.close()
485         filelen = os.stat(filename)[stat.ST_SIZE]
486         self.failUnlessEqual(filelen, 100+3)
487         f2 = open(filename, "rb")
488         self.failUnlessEqual(f2.read(5), "start")
489
490
491     def test_leases(self):
492         ss = self.create("test_leases")
493         canary = FakeCanary()
494         sharenums = range(5)
495         size = 100
496
497         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
498                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
499         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
500                                                      sharenums, size, canary)
501         self.failUnlessEqual(len(already), 0)
502         self.failUnlessEqual(len(writers), 5)
503         for wb in writers.values():
504             wb.remote_close()
505
506         leases = list(ss.get_leases("si0"))
507         self.failUnlessEqual(len(leases), 1)
508         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
509
510         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
511                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
512         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
513                                                      sharenums, size, canary)
514         for wb in writers.values():
515             wb.remote_close()
516
517         # take out a second lease on si1
518         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
521                                                      sharenums, size, canary)
522         self.failUnlessEqual(len(already), 5)
523         self.failUnlessEqual(len(writers), 0)
524
525         leases = list(ss.get_leases("si1"))
526         self.failUnlessEqual(len(leases), 2)
527         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
528
529         # and a third lease, using add-lease
530         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
531                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
532         ss.remote_add_lease("si1", rs2a, cs2a)
533         leases = list(ss.get_leases("si1"))
534         self.failUnlessEqual(len(leases), 3)
535         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
536
537         # add-lease on a missing storage index is silently ignored
538         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
539
540         # check that si0 is readable
541         readers = ss.remote_get_buckets("si0")
542         self.failUnlessEqual(len(readers), 5)
543
544         # renew the first lease. Only the proper renew_secret should work
545         ss.remote_renew_lease("si0", rs0)
546         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
547         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
548
549         # check that si0 is still readable
550         readers = ss.remote_get_buckets("si0")
551         self.failUnlessEqual(len(readers), 5)
552
553         # now cancel it
554         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
555         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
556         ss.remote_cancel_lease("si0", cs0)
557
558         # si0 should now be gone
559         readers = ss.remote_get_buckets("si0")
560         self.failUnlessEqual(len(readers), 0)
561         # and the renew should no longer work
562         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
563
564
565         # cancel the first lease on si1, leaving the second and third in place
566         ss.remote_cancel_lease("si1", cs1)
567         readers = ss.remote_get_buckets("si1")
568         self.failUnlessEqual(len(readers), 5)
569         # the corresponding renew should no longer work
570         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
571
572         leases = list(ss.get_leases("si1"))
573         self.failUnlessEqual(len(leases), 2)
574         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
575
576         ss.remote_renew_lease("si1", rs2)
577         # cancelling the second and third should make it go away
578         ss.remote_cancel_lease("si1", cs2)
579         ss.remote_cancel_lease("si1", cs2a)
580         readers = ss.remote_get_buckets("si1")
581         self.failUnlessEqual(len(readers), 0)
582         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
583         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
584         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
585
586         leases = list(ss.get_leases("si1"))
587         self.failUnlessEqual(len(leases), 0)
588
589
590         # test overlapping uploads
591         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
592                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
593         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
594                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
595         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
596                                                      sharenums, size, canary)
597         self.failUnlessEqual(len(already), 0)
598         self.failUnlessEqual(len(writers), 5)
599         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
600                                                        sharenums, size, canary)
601         self.failUnlessEqual(len(already2), 0)
602         self.failUnlessEqual(len(writers2), 0)
603         for wb in writers.values():
604             wb.remote_close()
605
606         leases = list(ss.get_leases("si3"))
607         self.failUnlessEqual(len(leases), 1)
608
609         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
610                                                        sharenums, size, canary)
611         self.failUnlessEqual(len(already3), 5)
612         self.failUnlessEqual(len(writers3), 0)
613
614         leases = list(ss.get_leases("si3"))
615         self.failUnlessEqual(len(leases), 2)
616
617     def test_readonly(self):
618         workdir = self.workdir("test_readonly")
619         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
620         ss.setServiceParent(self.sparent)
621
622         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
623         self.failUnlessEqual(already, set())
624         self.failUnlessEqual(writers, {})
625
626         stats = ss.get_stats()
627         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
628                              False)
629         if "storage_server.disk_avail" in stats:
630             # windows does not have os.statvfs, so it doesn't give us disk
631             # stats. But if there are stats, readonly_storage means
632             # disk_avail=0
633             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
634
635     def test_discard(self):
636         # discard is really only used for other tests, but we test it anyways
637         workdir = self.workdir("test_discard")
638         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
639         ss.setServiceParent(self.sparent)
640
641         canary = FakeCanary()
642         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
643         self.failUnlessEqual(already, set())
644         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
645         for i,wb in writers.items():
646             wb.remote_write(0, "%25d" % i)
647             wb.remote_close()
648         # since we discard the data, the shares should be present but sparse.
649         # Since we write with some seeks, the data we read back will be all
650         # zeros.
651         b = ss.remote_get_buckets("vid")
652         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
653         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
654
655     def test_advise_corruption(self):
656         workdir = self.workdir("test_advise_corruption")
657         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
658         ss.setServiceParent(self.sparent)
659
660         si0_s = base32.b2a("si0")
661         ss.remote_advise_corrupt_share("immutable", "si0", 0,
662                                        "This share smells funny.\n")
663         reportdir = os.path.join(workdir, "corruption-advisories")
664         reports = os.listdir(reportdir)
665         self.failUnlessEqual(len(reports), 1)
666         report_si0 = reports[0]
667         self.failUnless(si0_s in report_si0, report_si0)
668         f = open(os.path.join(reportdir, report_si0), "r")
669         report = f.read()
670         f.close()
671         self.failUnless("type: immutable" in report)
672         self.failUnless(("storage_index: %s" % si0_s) in report)
673         self.failUnless("share_number: 0" in report)
674         self.failUnless("This share smells funny." in report)
675
676         # test the RIBucketWriter version too
677         si1_s = base32.b2a("si1")
678         already,writers = self.allocate(ss, "si1", [1], 75)
679         self.failUnlessEqual(already, set())
680         self.failUnlessEqual(set(writers.keys()), set([1]))
681         writers[1].remote_write(0, "data")
682         writers[1].remote_close()
683
684         b = ss.remote_get_buckets("si1")
685         self.failUnlessEqual(set(b.keys()), set([1]))
686         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
687
688         reports = os.listdir(reportdir)
689         self.failUnlessEqual(len(reports), 2)
690         report_si1 = [r for r in reports if si1_s in r][0]
691         f = open(os.path.join(reportdir, report_si1), "r")
692         report = f.read()
693         f.close()
694         self.failUnless("type: immutable" in report)
695         self.failUnless(("storage_index: %s" % si1_s) in report)
696         self.failUnless("share_number: 1" in report)
697         self.failUnless("This share tastes like dust." in report)
698
699
700
701 class MutableServer(unittest.TestCase):
702
703     def setUp(self):
704         self.sparent = LoggingServiceParent()
705         self._lease_secret = itertools.count()
706     def tearDown(self):
707         return self.sparent.stopService()
708
709     def workdir(self, name):
710         basedir = os.path.join("storage", "MutableServer", name)
711         return basedir
712
713     def create(self, name):
714         workdir = self.workdir(name)
715         ss = StorageServer(workdir, "\x00" * 20)
716         ss.setServiceParent(self.sparent)
717         return ss
718
719     def test_create(self):
720         ss = self.create("test_create")
721
722     def write_enabler(self, we_tag):
723         return hashutil.tagged_hash("we_blah", we_tag)
724
725     def renew_secret(self, tag):
726         return hashutil.tagged_hash("renew_blah", str(tag))
727
728     def cancel_secret(self, tag):
729         return hashutil.tagged_hash("cancel_blah", str(tag))
730
731     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
732         write_enabler = self.write_enabler(we_tag)
733         renew_secret = self.renew_secret(lease_tag)
734         cancel_secret = self.cancel_secret(lease_tag)
735         rstaraw = ss.remote_slot_testv_and_readv_and_writev
736         testandwritev = dict( [ (shnum, ([], [], None) )
737                          for shnum in sharenums ] )
738         readv = []
739         rc = rstaraw(storage_index,
740                      (write_enabler, renew_secret, cancel_secret),
741                      testandwritev,
742                      readv)
743         (did_write, readv_data) = rc
744         self.failUnless(did_write)
745         self.failUnless(isinstance(readv_data, dict))
746         self.failUnlessEqual(len(readv_data), 0)
747
748     def test_bad_magic(self):
749         ss = self.create("test_bad_magic")
750         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
751         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
752         f = open(fn, "rb+")
753         f.seek(0)
754         f.write("BAD MAGIC")
755         f.close()
756         read = ss.remote_slot_readv
757         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
758                                   read, "si1", [0], [(0,10)])
759         self.failUnless(" had magic " in str(e), e)
760         self.failUnless(" but we wanted " in str(e), e)
761
762     def test_container_size(self):
763         ss = self.create("test_container_size")
764         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
765                       set([0,1,2]), 100)
766         read = ss.remote_slot_readv
767         rstaraw = ss.remote_slot_testv_and_readv_and_writev
768         secrets = ( self.write_enabler("we1"),
769                     self.renew_secret("we1"),
770                     self.cancel_secret("we1") )
771         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
772         answer = rstaraw("si1", secrets,
773                          {0: ([], [(0,data)], len(data)+12)},
774                          [])
775         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
776
777         # trying to make the container too large will raise an exception
778         TOOBIG = MutableShareFile.MAX_SIZE + 10
779         self.failUnlessRaises(DataTooLargeError,
780                               rstaraw, "si1", secrets,
781                               {0: ([], [(0,data)], TOOBIG)},
782                               [])
783
784         # it should be possible to make the container smaller, although at
785         # the moment this doesn't actually affect the share, unless the
786         # container size is dropped to zero, in which case the share is
787         # deleted.
788         answer = rstaraw("si1", secrets,
789                          {0: ([], [(0,data)], len(data)+8)},
790                          [])
791         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
792
793         answer = rstaraw("si1", secrets,
794                          {0: ([], [(0,data)], 0)},
795                          [])
796         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
797
798         read_answer = read("si1", [0], [(0,10)])
799         self.failUnlessEqual(read_answer, {})
800
801     def test_allocate(self):
802         ss = self.create("test_allocate")
803         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
804                       set([0,1,2]), 100)
805
806         read = ss.remote_slot_readv
807         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
808                              {0: [""]})
809         self.failUnlessEqual(read("si1", [], [(0, 10)]),
810                              {0: [""], 1: [""], 2: [""]})
811         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
812                              {0: [""]})
813
814         # try writing to one
815         secrets = ( self.write_enabler("we1"),
816                     self.renew_secret("we1"),
817                     self.cancel_secret("we1") )
818         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
819         write = ss.remote_slot_testv_and_readv_and_writev
820         answer = write("si1", secrets,
821                        {0: ([], [(0,data)], None)},
822                        [])
823         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
824
825         self.failUnlessEqual(read("si1", [0], [(0,20)]),
826                              {0: ["00000000001111111111"]})
827         self.failUnlessEqual(read("si1", [0], [(95,10)]),
828                              {0: ["99999"]})
829         #self.failUnlessEqual(s0.remote_get_length(), 100)
830
831         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
832         f = self.failUnlessRaises(BadWriteEnablerError,
833                                   write, "si1", bad_secrets,
834                                   {}, [])
835         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
836
837         # this testv should fail
838         answer = write("si1", secrets,
839                        {0: ([(0, 12, "eq", "444444444444"),
840                              (20, 5, "eq", "22222"),
841                              ],
842                             [(0, "x"*100)],
843                             None),
844                         },
845                        [(0,12), (20,5)],
846                        )
847         self.failUnlessEqual(answer, (False,
848                                       {0: ["000000000011", "22222"],
849                                        1: ["", ""],
850                                        2: ["", ""],
851                                        }))
852         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
853
854         # as should this one
855         answer = write("si1", secrets,
856                        {0: ([(10, 5, "lt", "11111"),
857                              ],
858                             [(0, "x"*100)],
859                             None),
860                         },
861                        [(10,5)],
862                        )
863         self.failUnlessEqual(answer, (False,
864                                       {0: ["11111"],
865                                        1: [""],
866                                        2: [""]},
867                                       ))
868         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
869
870
871     def test_operators(self):
872         # test operators, the data we're comparing is '11111' in all cases.
873         # test both fail+pass, reset data after each one.
874         ss = self.create("test_operators")
875
876         secrets = ( self.write_enabler("we1"),
877                     self.renew_secret("we1"),
878                     self.cancel_secret("we1") )
879         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
880         write = ss.remote_slot_testv_and_readv_and_writev
881         read = ss.remote_slot_readv
882
883         def reset():
884             write("si1", secrets,
885                   {0: ([], [(0,data)], None)},
886                   [])
887
888         reset()
889
890         #  lt
891         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
892                                              ],
893                                             [(0, "x"*100)],
894                                             None,
895                                             )}, [(10,5)])
896         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
897         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
898         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
899         reset()
900
901         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
902                                              ],
903                                             [(0, "x"*100)],
904                                             None,
905                                             )}, [(10,5)])
906         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
907         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
908         reset()
909
910         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
911                                              ],
912                                             [(0, "y"*100)],
913                                             None,
914                                             )}, [(10,5)])
915         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
916         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
917         reset()
918
919         #  le
920         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
921                                              ],
922                                             [(0, "x"*100)],
923                                             None,
924                                             )}, [(10,5)])
925         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
926         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
927         reset()
928
929         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
930                                              ],
931                                             [(0, "y"*100)],
932                                             None,
933                                             )}, [(10,5)])
934         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
935         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
936         reset()
937
938         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
939                                              ],
940                                             [(0, "y"*100)],
941                                             None,
942                                             )}, [(10,5)])
943         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
944         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
945         reset()
946
947         #  eq
948         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
949                                              ],
950                                             [(0, "x"*100)],
951                                             None,
952                                             )}, [(10,5)])
953         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
954         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
955         reset()
956
957         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
958                                              ],
959                                             [(0, "y"*100)],
960                                             None,
961                                             )}, [(10,5)])
962         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
963         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
964         reset()
965
966         #  ne
967         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
968                                              ],
969                                             [(0, "x"*100)],
970                                             None,
971                                             )}, [(10,5)])
972         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
973         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
974         reset()
975
976         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
977                                              ],
978                                             [(0, "y"*100)],
979                                             None,
980                                             )}, [(10,5)])
981         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
982         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
983         reset()
984
985         #  ge
986         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
987                                              ],
988                                             [(0, "y"*100)],
989                                             None,
990                                             )}, [(10,5)])
991         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
992         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
993         reset()
994
995         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
996                                              ],
997                                             [(0, "y"*100)],
998                                             None,
999                                             )}, [(10,5)])
1000         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1001         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1002         reset()
1003
1004         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1005                                              ],
1006                                             [(0, "y"*100)],
1007                                             None,
1008                                             )}, [(10,5)])
1009         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1010         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1011         reset()
1012
1013         #  gt
1014         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1015                                              ],
1016                                             [(0, "y"*100)],
1017                                             None,
1018                                             )}, [(10,5)])
1019         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1020         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1021         reset()
1022
1023         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1024                                              ],
1025                                             [(0, "x"*100)],
1026                                             None,
1027                                             )}, [(10,5)])
1028         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1029         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1030         reset()
1031
1032         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1033                                              ],
1034                                             [(0, "x"*100)],
1035                                             None,
1036                                             )}, [(10,5)])
1037         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1038         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1039         reset()
1040
1041         # finally, test some operators against empty shares
1042         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1043                                              ],
1044                                             [(0, "x"*100)],
1045                                             None,
1046                                             )}, [(10,5)])
1047         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1048         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1049         reset()
1050
1051     def test_readv(self):
1052         ss = self.create("test_readv")
1053         secrets = ( self.write_enabler("we1"),
1054                     self.renew_secret("we1"),
1055                     self.cancel_secret("we1") )
1056         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1057         write = ss.remote_slot_testv_and_readv_and_writev
1058         read = ss.remote_slot_readv
1059         data = [("%d" % i) * 100 for i in range(3)]
1060         rc = write("si1", secrets,
1061                    {0: ([], [(0,data[0])], None),
1062                     1: ([], [(0,data[1])], None),
1063                     2: ([], [(0,data[2])], None),
1064                     }, [])
1065         self.failUnlessEqual(rc, (True, {}))
1066
1067         answer = read("si1", [], [(0, 10)])
1068         self.failUnlessEqual(answer, {0: ["0"*10],
1069                                       1: ["1"*10],
1070                                       2: ["2"*10]})
1071
1072     def compare_leases_without_timestamps(self, leases_a, leases_b):
1073         self.failUnlessEqual(len(leases_a), len(leases_b))
1074         for i in range(len(leases_a)):
1075             a = leases_a[i]
1076             b = leases_b[i]
1077             self.failUnlessEqual(a.owner_num,       b.owner_num)
1078             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1079             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1080             self.failUnlessEqual(a.nodeid,          b.nodeid)
1081
1082     def compare_leases(self, leases_a, leases_b):
1083         self.failUnlessEqual(len(leases_a), len(leases_b))
1084         for i in range(len(leases_a)):
1085             a = leases_a[i]
1086             b = leases_b[i]
1087             self.failUnlessEqual(a.owner_num,       b.owner_num)
1088             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1089             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1090             self.failUnlessEqual(a.nodeid,          b.nodeid)
1091             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1092
1093     def test_leases(self):
1094         ss = self.create("test_leases")
1095         def secrets(n):
1096             return ( self.write_enabler("we1"),
1097                      self.renew_secret("we1-%d" % n),
1098                      self.cancel_secret("we1-%d" % n) )
1099         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1100         write = ss.remote_slot_testv_and_readv_and_writev
1101         read = ss.remote_slot_readv
1102         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1103         self.failUnlessEqual(rc, (True, {}))
1104
1105         # create a random non-numeric file in the bucket directory, to
1106         # exercise the code that's supposed to ignore those.
1107         bucket_dir = os.path.join(self.workdir("test_leases"),
1108                                   "shares", storage_index_to_dir("si1"))
1109         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1110         f.write("you ought to be ignoring me\n")
1111         f.close()
1112
1113         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1114         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1115
1116         # add-lease on a missing storage index is silently ignored
1117         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1118
1119         # re-allocate the slots and use the same secrets, that should update
1120         # the lease
1121         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1122         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1123
1124         # renew it directly
1125         ss.remote_renew_lease("si1", secrets(0)[1])
1126         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1127
1128         # now allocate them with a bunch of different secrets, to trigger the
1129         # extended lease code. Use add_lease for one of them.
1130         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1131         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1132         secrets2 = secrets(2)
1133         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1134         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1135         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1136         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1137         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1138
1139         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1140
1141         # cancel one of them
1142         ss.remote_cancel_lease("si1", secrets(5)[2])
1143         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1144
1145         all_leases = list(s0.get_leases())
1146         # and write enough data to expand the container, forcing the server
1147         # to move the leases
1148         write("si1", secrets(0),
1149               {0: ([], [(0,data)], 200), },
1150               [])
1151
1152         # read back the leases, make sure they're still intact.
1153         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1154
1155         ss.remote_renew_lease("si1", secrets(0)[1])
1156         ss.remote_renew_lease("si1", secrets(1)[1])
1157         ss.remote_renew_lease("si1", secrets(2)[1])
1158         ss.remote_renew_lease("si1", secrets(3)[1])
1159         ss.remote_renew_lease("si1", secrets(4)[1])
1160         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1161         # get a new copy of the leases, with the current timestamps. Reading
1162         # data and failing to renew/cancel leases should leave the timestamps
1163         # alone.
1164         all_leases = list(s0.get_leases())
1165         # renewing with a bogus token should prompt an error message
1166
1167         # examine the exception thus raised, make sure the old nodeid is
1168         # present, to provide for share migration
1169         e = self.failUnlessRaises(IndexError,
1170                                   ss.remote_renew_lease, "si1",
1171                                   secrets(20)[1])
1172         e_s = str(e)
1173         self.failUnless("Unable to renew non-existent lease" in e_s)
1174         self.failUnless("I have leases accepted by nodeids:" in e_s)
1175         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1176
1177         # same for cancelling
1178         self.failUnlessRaises(IndexError,
1179                               ss.remote_cancel_lease, "si1",
1180                               secrets(20)[2])
1181         self.compare_leases(all_leases, list(s0.get_leases()))
1182
1183         # reading shares should not modify the timestamp
1184         read("si1", [], [(0,200)])
1185         self.compare_leases(all_leases, list(s0.get_leases()))
1186
1187         write("si1", secrets(0),
1188               {0: ([], [(200, "make me bigger")], None)}, [])
1189         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1190
1191         write("si1", secrets(0),
1192               {0: ([], [(500, "make me really bigger")], None)}, [])
1193         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1194
1195         # now cancel them all
1196         ss.remote_cancel_lease("si1", secrets(0)[2])
1197         ss.remote_cancel_lease("si1", secrets(1)[2])
1198         ss.remote_cancel_lease("si1", secrets(2)[2])
1199         ss.remote_cancel_lease("si1", secrets(3)[2])
1200
1201         # the slot should still be there
1202         remaining_shares = read("si1", [], [(0,10)])
1203         self.failUnlessEqual(len(remaining_shares), 1)
1204         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1205
1206         # cancelling a non-existent lease should raise an IndexError
1207         self.failUnlessRaises(IndexError,
1208                               ss.remote_cancel_lease, "si1", "nonsecret")
1209
1210         # and the slot should still be there
1211         remaining_shares = read("si1", [], [(0,10)])
1212         self.failUnlessEqual(len(remaining_shares), 1)
1213         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1214
1215         ss.remote_cancel_lease("si1", secrets(4)[2])
1216         # now the slot should be gone
1217         no_shares = read("si1", [], [(0,10)])
1218         self.failUnlessEqual(no_shares, {})
1219
1220         # cancelling a lease on a non-existent share should raise an IndexError
1221         self.failUnlessRaises(IndexError,
1222                               ss.remote_cancel_lease, "si2", "nonsecret")
1223
1224     def test_remove(self):
1225         ss = self.create("test_remove")
1226         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1227                       set([0,1,2]), 100)
1228         readv = ss.remote_slot_readv
1229         writev = ss.remote_slot_testv_and_readv_and_writev
1230         secrets = ( self.write_enabler("we1"),
1231                     self.renew_secret("we1"),
1232                     self.cancel_secret("we1") )
1233         # delete sh0 by setting its size to zero
1234         answer = writev("si1", secrets,
1235                         {0: ([], [], 0)},
1236                         [])
1237         # the answer should mention all the shares that existed before the
1238         # write
1239         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1240         # but a new read should show only sh1 and sh2
1241         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1242                              {1: [""], 2: [""]})
1243
1244         # delete sh1 by setting its size to zero
1245         answer = writev("si1", secrets,
1246                         {1: ([], [], 0)},
1247                         [])
1248         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1249         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1250                              {2: [""]})
1251
1252         # delete sh2 by setting its size to zero
1253         answer = writev("si1", secrets,
1254                         {2: ([], [], 0)},
1255                         [])
1256         self.failUnlessEqual(answer, (True, {2:[]}) )
1257         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1258                              {})
1259         # and the bucket directory should now be gone
1260         si = base32.b2a("si1")
1261         # note: this is a detail of the storage server implementation, and
1262         # may change in the future
1263         prefix = si[:2]
1264         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1265         bucketdir = os.path.join(prefixdir, si)
1266         self.failUnless(os.path.exists(prefixdir))
1267         self.failIf(os.path.exists(bucketdir))
1268
1269 class Stats(unittest.TestCase):
1270
1271     def setUp(self):
1272         self.sparent = LoggingServiceParent()
1273         self._lease_secret = itertools.count()
1274     def tearDown(self):
1275         return self.sparent.stopService()
1276
1277     def workdir(self, name):
1278         basedir = os.path.join("storage", "Server", name)
1279         return basedir
1280
1281     def create(self, name):
1282         workdir = self.workdir(name)
1283         ss = StorageServer(workdir, "\x00" * 20)
1284         ss.setServiceParent(self.sparent)
1285         return ss
1286
1287     def test_latencies(self):
1288         ss = self.create("test_latencies")
1289         for i in range(10000):
1290             ss.add_latency("allocate", 1.0 * i)
1291         for i in range(1000):
1292             ss.add_latency("renew", 1.0 * i)
1293         for i in range(10):
1294             ss.add_latency("cancel", 2.0 * i)
1295         ss.add_latency("get", 5.0)
1296
1297         output = ss.get_latencies()
1298
1299         self.failUnlessEqual(sorted(output.keys()),
1300                              sorted(["allocate", "renew", "cancel", "get"]))
1301         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1302         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1303         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1304         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1305         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1306         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1307         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1308         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1309         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1310
1311         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1312         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1313         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1314         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1315         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1316         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1317         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1318         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1319         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1320
1321         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1322         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1323         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1324         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1325         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1326         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1327         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1328         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1329         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1330
1331         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1332         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1333         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1334         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1335         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1336         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1337         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1338         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1339         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1340
1341 def remove_tags(s):
1342     s = re.sub(r'<[^>]*>', ' ', s)
1343     s = re.sub(r'\s+', ' ', s)
1344     return s
1345
1346 class MyBucketCountingCrawler(BucketCountingCrawler):
1347     def finished_prefix(self, cycle, prefix):
1348         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1349         if self.hook_ds:
1350             d = self.hook_ds.pop(0)
1351             d.callback(None)
1352
1353 class MyStorageServer(StorageServer):
1354     def add_bucket_counter(self):
1355         statefile = os.path.join(self.storedir, "bucket_counter.state")
1356         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1357         self.bucket_counter.setServiceParent(self)
1358
1359 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1360
1361     def setUp(self):
1362         self.s = service.MultiService()
1363         self.s.startService()
1364     def tearDown(self):
1365         return self.s.stopService()
1366
1367     def test_bucket_counter(self):
1368         basedir = "storage/BucketCounter/bucket_counter"
1369         fileutil.make_dirs(basedir)
1370         ss = StorageServer(basedir, "\x00" * 20)
1371         # to make sure we capture the bucket-counting-crawler in the middle
1372         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1373         # also make it start sooner than usual.
1374         ss.bucket_counter.slow_start = 0
1375         orig_cpu_slice = ss.bucket_counter.cpu_slice
1376         ss.bucket_counter.cpu_slice = 0
1377         ss.setServiceParent(self.s)
1378
1379         w = StorageStatus(ss)
1380
1381         # this sample is before the crawler has started doing anything
1382         html = w.renderSynchronously()
1383         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1384         s = remove_tags(html)
1385         self.failUnless("Accepting new shares: Yes" in s, s)
1386         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1387         self.failUnless("Total buckets: Not computed yet" in s, s)
1388         self.failUnless("Next crawl in" in s, s)
1389
1390         # give the bucket-counting-crawler one tick to get started. The
1391         # cpu_slice=0 will force it to yield right after it processes the
1392         # first prefix
1393
1394         d = eventual.fireEventually()
1395         def _check(ignored):
1396             # are we really right after the first prefix?
1397             state = ss.bucket_counter.get_state()
1398             self.failUnlessEqual(state["last-complete-prefix"],
1399                                  ss.bucket_counter.prefixes[0])
1400             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1401             html = w.renderSynchronously()
1402             s = remove_tags(html)
1403             self.failUnless(" Current crawl " in s, s)
1404             self.failUnless(" (next work in " in s, s)
1405         d.addCallback(_check)
1406
1407         # now give it enough time to complete a full cycle
1408         def _watch():
1409             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1410         d.addCallback(lambda ignored: self.poll(_watch))
1411         def _check2(ignored):
1412             ss.bucket_counter.cpu_slice = orig_cpu_slice
1413             html = w.renderSynchronously()
1414             s = remove_tags(html)
1415             self.failUnless("Total buckets: 0 (the number of" in s, s)
1416             self.failUnless("Next crawl in 59 minutes" in s, s)
1417         d.addCallback(_check2)
1418         return d
1419
1420     def test_bucket_counter_cleanup(self):
1421         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1422         fileutil.make_dirs(basedir)
1423         ss = StorageServer(basedir, "\x00" * 20)
1424         # to make sure we capture the bucket-counting-crawler in the middle
1425         # of a cycle, we reach in and reduce its maximum slice time to 0.
1426         ss.bucket_counter.slow_start = 0
1427         orig_cpu_slice = ss.bucket_counter.cpu_slice
1428         ss.bucket_counter.cpu_slice = 0
1429         ss.setServiceParent(self.s)
1430
1431         d = eventual.fireEventually()
1432
1433         def _after_first_prefix(ignored):
1434             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1435             # now sneak in and mess with its state, to make sure it cleans up
1436             # properly at the end of the cycle
1437             state = ss.bucket_counter.state
1438             self.failUnlessEqual(state["last-complete-prefix"],
1439                                  ss.bucket_counter.prefixes[0])
1440             state["bucket-counts"][-12] = {}
1441             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1442             ss.bucket_counter.save_state()
1443         d.addCallback(_after_first_prefix)
1444
1445         # now give it enough time to complete a cycle
1446         def _watch():
1447             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1448         d.addCallback(lambda ignored: self.poll(_watch))
1449         def _check2(ignored):
1450             ss.bucket_counter.cpu_slice = orig_cpu_slice
1451             s = ss.bucket_counter.get_state()
1452             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1453             self.failIf("bogusprefix!" in s["storage-index-samples"],
1454                         s["storage-index-samples"].keys())
1455         d.addCallback(_check2)
1456         return d
1457
1458     def test_bucket_counter_eta(self):
1459         basedir = "storage/BucketCounter/bucket_counter_eta"
1460         fileutil.make_dirs(basedir)
1461         ss = MyStorageServer(basedir, "\x00" * 20)
1462         ss.bucket_counter.slow_start = 0
1463         # these will be fired inside finished_prefix()
1464         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1465         w = StorageStatus(ss)
1466
1467         d = defer.Deferred()
1468
1469         def _check_1(ignored):
1470             # no ETA is available yet
1471             html = w.renderSynchronously()
1472             s = remove_tags(html)
1473             self.failUnlessIn("complete (next work", s)
1474
1475         def _check_2(ignored):
1476             # one prefix has finished, so an ETA based upon that elapsed time
1477             # should be available.
1478             html = w.renderSynchronously()
1479             s = remove_tags(html)
1480             self.failUnlessIn("complete (ETA ", s)
1481
1482         def _check_3(ignored):
1483             # two prefixes have finished
1484             html = w.renderSynchronously()
1485             s = remove_tags(html)
1486             self.failUnlessIn("complete (ETA ", s)
1487             d.callback("done")
1488
1489         hooks[0].addCallback(_check_1).addErrback(d.errback)
1490         hooks[1].addCallback(_check_2).addErrback(d.errback)
1491         hooks[2].addCallback(_check_3).addErrback(d.errback)
1492
1493         ss.setServiceParent(self.s)
1494         return d
1495
1496 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1497     stop_after_first_bucket = False
1498     def process_bucket(self, *args, **kwargs):
1499         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1500         if self.stop_after_first_bucket:
1501             self.stop_after_first_bucket = False
1502             self.cpu_slice = -1.0
1503     def yielding(self, sleep_time):
1504         if not self.stop_after_first_bucket:
1505             self.cpu_slice = 500
1506
1507 class BrokenStatResults:
1508     pass
1509 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1510     def stat(self, fn):
1511         s = os.stat(fn)
1512         bsr = BrokenStatResults()
1513         for attrname in dir(s):
1514             if attrname.startswith("_"):
1515                 continue
1516             if attrname == "st_blocks":
1517                 continue
1518             setattr(bsr, attrname, getattr(s, attrname))
1519         return bsr
1520
1521 class InstrumentedStorageServer(StorageServer):
1522     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1523 class No_ST_BLOCKS_StorageServer(StorageServer):
1524     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1525
1526 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1527
1528     def setUp(self):
1529         self.s = service.MultiService()
1530         self.s.startService()
1531     def tearDown(self):
1532         return self.s.stopService()
1533
1534     def make_shares(self, ss):
1535         def make(si):
1536             return (si, hashutil.tagged_hash("renew", si),
1537                     hashutil.tagged_hash("cancel", si))
1538         def make_mutable(si):
1539             return (si, hashutil.tagged_hash("renew", si),
1540                     hashutil.tagged_hash("cancel", si),
1541                     hashutil.tagged_hash("write-enabler", si))
1542         def make_extra_lease(si, num):
1543             return (hashutil.tagged_hash("renew-%d" % num, si),
1544                     hashutil.tagged_hash("cancel-%d" % num, si))
1545
1546         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1547         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1548         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1549         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1550         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1551         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1552         sharenums = [0]
1553         canary = FakeCanary()
1554         # note: 'tahoe debug dump-share' will not handle this file, since the
1555         # inner contents are not a valid CHK share
1556         data = "\xff" * 1000
1557
1558         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1559                                          1000, canary)
1560         w[0].remote_write(0, data)
1561         w[0].remote_close()
1562
1563         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1564                                          1000, canary)
1565         w[0].remote_write(0, data)
1566         w[0].remote_close()
1567         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1568
1569         writev = ss.remote_slot_testv_and_readv_and_writev
1570         writev(mutable_si_2, (we2, rs2, cs2),
1571                {0: ([], [(0,data)], len(data))}, [])
1572         writev(mutable_si_3, (we3, rs3, cs3),
1573                {0: ([], [(0,data)], len(data))}, [])
1574         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1575
1576         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1577         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1578         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1579
1580     def test_basic(self):
1581         basedir = "storage/LeaseCrawler/basic"
1582         fileutil.make_dirs(basedir)
1583         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1584         # make it start sooner than usual.
1585         lc = ss.lease_checker
1586         lc.slow_start = 0
1587         lc.cpu_slice = 500
1588         lc.stop_after_first_bucket = True
1589         webstatus = StorageStatus(ss)
1590
1591         # create a few shares, with some leases on them
1592         self.make_shares(ss)
1593         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1594
1595         # add a non-sharefile to exercise another code path
1596         fn = os.path.join(ss.sharedir,
1597                           storage_index_to_dir(immutable_si_0),
1598                           "not-a-share")
1599         f = open(fn, "wb")
1600         f.write("I am not a share.\n")
1601         f.close()
1602
1603         # this is before the crawl has started, so we're not in a cycle yet
1604         initial_state = lc.get_state()
1605         self.failIf(lc.get_progress()["cycle-in-progress"])
1606         self.failIf("cycle-to-date" in initial_state)
1607         self.failIf("estimated-remaining-cycle" in initial_state)
1608         self.failIf("estimated-current-cycle" in initial_state)
1609         self.failUnless("history" in initial_state)
1610         self.failUnlessEqual(initial_state["history"], {})
1611
1612         ss.setServiceParent(self.s)
1613
1614         DAY = 24*60*60
1615
1616         d = eventual.fireEventually()
1617
1618         # now examine the state right after the first bucket has been
1619         # processed.
1620         def _after_first_bucket(ignored):
1621             initial_state = lc.get_state()
1622             self.failUnless("cycle-to-date" in initial_state)
1623             self.failUnless("estimated-remaining-cycle" in initial_state)
1624             self.failUnless("estimated-current-cycle" in initial_state)
1625             self.failUnless("history" in initial_state)
1626             self.failUnlessEqual(initial_state["history"], {})
1627
1628             so_far = initial_state["cycle-to-date"]
1629             self.failUnlessEqual(so_far["expiration-enabled"], False)
1630             self.failUnless("configured-expiration-mode" in so_far)
1631             self.failUnless("lease-age-histogram" in so_far)
1632             lah = so_far["lease-age-histogram"]
1633             self.failUnlessEqual(type(lah), list)
1634             self.failUnlessEqual(len(lah), 1)
1635             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1636             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1637             self.failUnlessEqual(so_far["corrupt-shares"], [])
1638             sr1 = so_far["space-recovered"]
1639             self.failUnlessEqual(sr1["examined-buckets"], 1)
1640             self.failUnlessEqual(sr1["examined-shares"], 1)
1641             self.failUnlessEqual(sr1["actual-shares"], 0)
1642             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1643             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1644             left = initial_state["estimated-remaining-cycle"]
1645             sr2 = left["space-recovered"]
1646             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1647             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1648             self.failIfEqual(sr2["actual-shares"], None)
1649             self.failIfEqual(sr2["configured-diskbytes"], None)
1650             self.failIfEqual(sr2["original-sharebytes"], None)
1651         d.addCallback(_after_first_bucket)
1652         d.addCallback(lambda ign: self.render1(webstatus))
1653         def _check_html_in_cycle(html):
1654             s = remove_tags(html)
1655             self.failUnlessIn("So far, this cycle has examined "
1656                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1657             self.failUnlessIn("and has recovered: "
1658                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1659                               "0 B (0 B / 0 B)", s)
1660             self.failUnlessIn("If expiration were enabled, "
1661                               "we would have recovered: "
1662                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1663                               " 0 B (0 B / 0 B) by now", s)
1664             self.failUnlessIn("and the remainder of this cycle "
1665                               "would probably recover: "
1666                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1667                               " 0 B (0 B / 0 B)", s)
1668             self.failUnlessIn("and the whole cycle would probably recover: "
1669                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1670                               " 0 B (0 B / 0 B)", s)
1671             self.failUnlessIn("if we were strictly using each lease's default "
1672                               "31-day lease lifetime", s)
1673             self.failUnlessIn("this cycle would be expected to recover: ", s)
1674         d.addCallback(_check_html_in_cycle)
1675
1676         # wait for the crawler to finish the first cycle. Nothing should have
1677         # been removed.
1678         def _wait():
1679             return bool(lc.get_state()["last-cycle-finished"] is not None)
1680         d.addCallback(lambda ign: self.poll(_wait))
1681
1682         def _after_first_cycle(ignored):
1683             s = lc.get_state()
1684             self.failIf("cycle-to-date" in s)
1685             self.failIf("estimated-remaining-cycle" in s)
1686             self.failIf("estimated-current-cycle" in s)
1687             last = s["history"][0]
1688             self.failUnless("cycle-start-finish-times" in last)
1689             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1690             self.failUnlessEqual(last["expiration-enabled"], False)
1691             self.failUnless("configured-expiration-mode" in last)
1692
1693             self.failUnless("lease-age-histogram" in last)
1694             lah = last["lease-age-histogram"]
1695             self.failUnlessEqual(type(lah), list)
1696             self.failUnlessEqual(len(lah), 1)
1697             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1698
1699             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1700             self.failUnlessEqual(last["corrupt-shares"], [])
1701
1702             rec = last["space-recovered"]
1703             self.failUnlessEqual(rec["examined-buckets"], 4)
1704             self.failUnlessEqual(rec["examined-shares"], 4)
1705             self.failUnlessEqual(rec["actual-buckets"], 0)
1706             self.failUnlessEqual(rec["original-buckets"], 0)
1707             self.failUnlessEqual(rec["configured-buckets"], 0)
1708             self.failUnlessEqual(rec["actual-shares"], 0)
1709             self.failUnlessEqual(rec["original-shares"], 0)
1710             self.failUnlessEqual(rec["configured-shares"], 0)
1711             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1712             self.failUnlessEqual(rec["original-diskbytes"], 0)
1713             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1714             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1715             self.failUnlessEqual(rec["original-sharebytes"], 0)
1716             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1717
1718             def _get_sharefile(si):
1719                 return list(ss._iter_share_files(si))[0]
1720             def count_leases(si):
1721                 return len(list(_get_sharefile(si).get_leases()))
1722             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1723             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1724             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1725             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1726         d.addCallback(_after_first_cycle)
1727         d.addCallback(lambda ign: self.render1(webstatus))
1728         def _check_html(html):
1729             s = remove_tags(html)
1730             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1731                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
1732                               "but expiration was not enabled", s)
1733         d.addCallback(_check_html)
1734         return d
1735
1736     def backdate_lease(self, sf, renew_secret, new_expire_time):
1737         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1738         # "renew" a lease with a new_expire_time that is older than what the
1739         # current lease has), so we have to reach inside it.
1740         for i,lease in enumerate(sf.get_leases()):
1741             if lease.renew_secret == renew_secret:
1742                 lease.expiration_time = new_expire_time
1743                 f = open(sf.home, 'rb+')
1744                 sf._write_lease_record(f, i, lease)
1745                 f.close()
1746                 return
1747         raise IndexError("unable to renew non-existent lease")
1748
1749     def test_expire_age(self):
1750         basedir = "storage/LeaseCrawler/expire_age"
1751         fileutil.make_dirs(basedir)
1752         # setting expiration_time to 2000 means that any lease which is more
1753         # than 2000s old will be expired.
1754         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1755                                        expiration_enabled=True,
1756                                        expiration_mode="age",
1757                                        expiration_override_lease_duration=2000)
1758         # make it start sooner than usual.
1759         lc = ss.lease_checker
1760         lc.slow_start = 0
1761         lc.stop_after_first_bucket = True
1762         webstatus = StorageStatus(ss)
1763
1764         # create a few shares, with some leases on them
1765         self.make_shares(ss)
1766         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1767
1768         def count_shares(si):
1769             return len(list(ss._iter_share_files(si)))
1770         def _get_sharefile(si):
1771             return list(ss._iter_share_files(si))[0]
1772         def count_leases(si):
1773             return len(list(_get_sharefile(si).get_leases()))
1774
1775         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1776         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1777         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1778         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1779         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1780         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1781         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1782         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1783
1784         # artificially crank back the expiration time on the first lease of
1785         # each share, to make it look like it expired already (age=1000s).
1786         # Some shares have an extra lease which is set to expire at the
1787         # default time in 31 days from now (age=31days). We then run the
1788         # crawler, which will expire the first lease, making some shares get
1789         # deleted and others stay alive (with one remaining lease)
1790         now = time.time()
1791
1792         sf0 = _get_sharefile(immutable_si_0)
1793         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1794         sf0_size = os.stat(sf0.home).st_size
1795
1796         # immutable_si_1 gets an extra lease
1797         sf1 = _get_sharefile(immutable_si_1)
1798         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1799
1800         sf2 = _get_sharefile(mutable_si_2)
1801         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1802         sf2_size = os.stat(sf2.home).st_size
1803
1804         # mutable_si_3 gets an extra lease
1805         sf3 = _get_sharefile(mutable_si_3)
1806         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1807
1808         ss.setServiceParent(self.s)
1809
1810         d = eventual.fireEventually()
1811         # examine the state right after the first bucket has been processed
1812         def _after_first_bucket(ignored):
1813             p = lc.get_progress()
1814             self.failUnless(p["cycle-in-progress"])
1815         d.addCallback(_after_first_bucket)
1816         d.addCallback(lambda ign: self.render1(webstatus))
1817         def _check_html_in_cycle(html):
1818             s = remove_tags(html)
1819             # the first bucket encountered gets deleted, and its prefix
1820             # happens to be about 1/5th of the way through the ring, so the
1821             # predictor thinks we'll have 5 shares and that we'll delete them
1822             # all. This part of the test depends upon the SIs landing right
1823             # where they do now.
1824             self.failUnlessIn("The remainder of this cycle is expected to "
1825                               "recover: 4 shares, 4 buckets", s)
1826             self.failUnlessIn("The whole cycle is expected to examine "
1827                               "5 shares in 5 buckets and to recover: "
1828                               "5 shares, 5 buckets", s)
1829         d.addCallback(_check_html_in_cycle)
1830
1831         # wait for the crawler to finish the first cycle. Two shares should
1832         # have been removed
1833         def _wait():
1834             return bool(lc.get_state()["last-cycle-finished"] is not None)
1835         d.addCallback(lambda ign: self.poll(_wait))
1836
1837         def _after_first_cycle(ignored):
1838             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1839             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1840             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1841             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1842             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1843             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1844
1845             s = lc.get_state()
1846             last = s["history"][0]
1847
1848             self.failUnlessEqual(last["expiration-enabled"], True)
1849             self.failUnlessEqual(last["configured-expiration-mode"],
1850                                  ("age", 2000, None, ("mutable", "immutable")))
1851             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1852
1853             rec = last["space-recovered"]
1854             self.failUnlessEqual(rec["examined-buckets"], 4)
1855             self.failUnlessEqual(rec["examined-shares"], 4)
1856             self.failUnlessEqual(rec["actual-buckets"], 2)
1857             self.failUnlessEqual(rec["original-buckets"], 2)
1858             self.failUnlessEqual(rec["configured-buckets"], 2)
1859             self.failUnlessEqual(rec["actual-shares"], 2)
1860             self.failUnlessEqual(rec["original-shares"], 2)
1861             self.failUnlessEqual(rec["configured-shares"], 2)
1862             size = sf0_size + sf2_size
1863             self.failUnlessEqual(rec["actual-sharebytes"], size)
1864             self.failUnlessEqual(rec["original-sharebytes"], size)
1865             self.failUnlessEqual(rec["configured-sharebytes"], size)
1866             # different platforms have different notions of "blocks used by
1867             # this file", so merely assert that it's a number
1868             self.failUnless(rec["actual-diskbytes"] >= 0,
1869                             rec["actual-diskbytes"])
1870             self.failUnless(rec["original-diskbytes"] >= 0,
1871                             rec["original-diskbytes"])
1872             self.failUnless(rec["configured-diskbytes"] >= 0,
1873                             rec["configured-diskbytes"])
1874         d.addCallback(_after_first_cycle)
1875         d.addCallback(lambda ign: self.render1(webstatus))
1876         def _check_html(html):
1877             s = remove_tags(html)
1878             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1879             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1880             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1881         d.addCallback(_check_html)
1882         return d
1883
1884     def test_expire_cutoff_date(self):
1885         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1886         fileutil.make_dirs(basedir)
1887         # setting cutoff-date to 2000 seconds ago means that any lease which
1888         # is more than 2000s old will be expired.
1889         now = time.time()
1890         then = int(now - 2000)
1891         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1892                                        expiration_enabled=True,
1893                                        expiration_mode="cutoff-date",
1894                                        expiration_cutoff_date=then)
1895         # make it start sooner than usual.
1896         lc = ss.lease_checker
1897         lc.slow_start = 0
1898         lc.stop_after_first_bucket = True
1899         webstatus = StorageStatus(ss)
1900
1901         # create a few shares, with some leases on them
1902         self.make_shares(ss)
1903         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1904
1905         def count_shares(si):
1906             return len(list(ss._iter_share_files(si)))
1907         def _get_sharefile(si):
1908             return list(ss._iter_share_files(si))[0]
1909         def count_leases(si):
1910             return len(list(_get_sharefile(si).get_leases()))
1911
1912         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1913         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1914         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1915         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1916         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1917         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1918         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1919         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1920
1921         # artificially crank back the expiration time on the first lease of
1922         # each share, to make it look like was renewed 3000s ago. To achieve
1923         # this, we need to set the expiration time to now-3000+31days. This
1924         # will change when the lease format is improved to contain both
1925         # create/renew time and duration.
1926         new_expiration_time = now - 3000 + 31*24*60*60
1927
1928         # Some shares have an extra lease which is set to expire at the
1929         # default time in 31 days from now (age=31days). We then run the
1930         # crawler, which will expire the first lease, making some shares get
1931         # deleted and others stay alive (with one remaining lease)
1932
1933         sf0 = _get_sharefile(immutable_si_0)
1934         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1935         sf0_size = os.stat(sf0.home).st_size
1936
1937         # immutable_si_1 gets an extra lease
1938         sf1 = _get_sharefile(immutable_si_1)
1939         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1940
1941         sf2 = _get_sharefile(mutable_si_2)
1942         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1943         sf2_size = os.stat(sf2.home).st_size
1944
1945         # mutable_si_3 gets an extra lease
1946         sf3 = _get_sharefile(mutable_si_3)
1947         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1948
1949         ss.setServiceParent(self.s)
1950
1951         d = eventual.fireEventually()
1952         # examine the state right after the first bucket has been processed
1953         def _after_first_bucket(ignored):
1954             p = lc.get_progress()
1955             self.failUnless(p["cycle-in-progress"])
1956         d.addCallback(_after_first_bucket)
1957         d.addCallback(lambda ign: self.render1(webstatus))
1958         def _check_html_in_cycle(html):
1959             s = remove_tags(html)
1960             # the first bucket encountered gets deleted, and its prefix
1961             # happens to be about 1/5th of the way through the ring, so the
1962             # predictor thinks we'll have 5 shares and that we'll delete them
1963             # all. This part of the test depends upon the SIs landing right
1964             # where they do now.
1965             self.failUnlessIn("The remainder of this cycle is expected to "
1966                               "recover: 4 shares, 4 buckets", s)
1967             self.failUnlessIn("The whole cycle is expected to examine "
1968                               "5 shares in 5 buckets and to recover: "
1969                               "5 shares, 5 buckets", s)
1970         d.addCallback(_check_html_in_cycle)
1971
1972         # wait for the crawler to finish the first cycle. Two shares should
1973         # have been removed
1974         def _wait():
1975             return bool(lc.get_state()["last-cycle-finished"] is not None)
1976         d.addCallback(lambda ign: self.poll(_wait))
1977
1978         def _after_first_cycle(ignored):
1979             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1980             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1981             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1982             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1983             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1984             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1985
1986             s = lc.get_state()
1987             last = s["history"][0]
1988
1989             self.failUnlessEqual(last["expiration-enabled"], True)
1990             self.failUnlessEqual(last["configured-expiration-mode"],
1991                                  ("cutoff-date", None, then,
1992                                   ("mutable", "immutable")))
1993             self.failUnlessEqual(last["leases-per-share-histogram"],
1994                                  {1: 2, 2: 2})
1995
1996             rec = last["space-recovered"]
1997             self.failUnlessEqual(rec["examined-buckets"], 4)
1998             self.failUnlessEqual(rec["examined-shares"], 4)
1999             self.failUnlessEqual(rec["actual-buckets"], 2)
2000             self.failUnlessEqual(rec["original-buckets"], 0)
2001             self.failUnlessEqual(rec["configured-buckets"], 2)
2002             self.failUnlessEqual(rec["actual-shares"], 2)
2003             self.failUnlessEqual(rec["original-shares"], 0)
2004             self.failUnlessEqual(rec["configured-shares"], 2)
2005             size = sf0_size + sf2_size
2006             self.failUnlessEqual(rec["actual-sharebytes"], size)
2007             self.failUnlessEqual(rec["original-sharebytes"], 0)
2008             self.failUnlessEqual(rec["configured-sharebytes"], size)
2009             # different platforms have different notions of "blocks used by
2010             # this file", so merely assert that it's a number
2011             self.failUnless(rec["actual-diskbytes"] >= 0,
2012                             rec["actual-diskbytes"])
2013             self.failUnless(rec["original-diskbytes"] >= 0,
2014                             rec["original-diskbytes"])
2015             self.failUnless(rec["configured-diskbytes"] >= 0,
2016                             rec["configured-diskbytes"])
2017         d.addCallback(_after_first_cycle)
2018         d.addCallback(lambda ign: self.render1(webstatus))
2019         def _check_html(html):
2020             s = remove_tags(html)
2021             self.failUnlessIn("Expiration Enabled:"
2022                               " expired leases will be removed", s)
2023             date = time.strftime("%d-%b-%Y", time.gmtime(then))
2024             self.failUnlessIn("Leases created or last renewed before %s"
2025                               " will be considered expired." % date, s)
2026             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2027         d.addCallback(_check_html)
2028         return d
2029
2030     def test_only_immutable(self):
2031         basedir = "storage/LeaseCrawler/only_immutable"
2032         fileutil.make_dirs(basedir)
2033         now = time.time()
2034         then = int(now - 2000)
2035         ss = StorageServer(basedir, "\x00" * 20,
2036                            expiration_enabled=True,
2037                            expiration_mode="cutoff-date",
2038                            expiration_cutoff_date=then,
2039                            expiration_sharetypes=("immutable",))
2040         lc = ss.lease_checker
2041         lc.slow_start = 0
2042         webstatus = StorageStatus(ss)
2043
2044         self.make_shares(ss)
2045         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2046         # set all leases to be expirable
2047         new_expiration_time = now - 3000 + 31*24*60*60
2048
2049         def count_shares(si):
2050             return len(list(ss._iter_share_files(si)))
2051         def _get_sharefile(si):
2052             return list(ss._iter_share_files(si))[0]
2053         def count_leases(si):
2054             return len(list(_get_sharefile(si).get_leases()))
2055
2056         sf0 = _get_sharefile(immutable_si_0)
2057         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2058         sf1 = _get_sharefile(immutable_si_1)
2059         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2060         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2061         sf2 = _get_sharefile(mutable_si_2)
2062         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2063         sf3 = _get_sharefile(mutable_si_3)
2064         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2065         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2066
2067         ss.setServiceParent(self.s)
2068         def _wait():
2069             return bool(lc.get_state()["last-cycle-finished"] is not None)
2070         d = self.poll(_wait)
2071
2072         def _after_first_cycle(ignored):
2073             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2074             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2075             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2076             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2077             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2078             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2079         d.addCallback(_after_first_cycle)
2080         d.addCallback(lambda ign: self.render1(webstatus))
2081         def _check_html(html):
2082             s = remove_tags(html)
2083             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2084         d.addCallback(_check_html)
2085         return d
2086
2087     def test_only_mutable(self):
2088         basedir = "storage/LeaseCrawler/only_mutable"
2089         fileutil.make_dirs(basedir)
2090         now = time.time()
2091         then = int(now - 2000)
2092         ss = StorageServer(basedir, "\x00" * 20,
2093                            expiration_enabled=True,
2094                            expiration_mode="cutoff-date",
2095                            expiration_cutoff_date=then,
2096                            expiration_sharetypes=("mutable",))
2097         lc = ss.lease_checker
2098         lc.slow_start = 0
2099         webstatus = StorageStatus(ss)
2100
2101         self.make_shares(ss)
2102         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2103         # set all leases to be expirable
2104         new_expiration_time = now - 3000 + 31*24*60*60
2105
2106         def count_shares(si):
2107             return len(list(ss._iter_share_files(si)))
2108         def _get_sharefile(si):
2109             return list(ss._iter_share_files(si))[0]
2110         def count_leases(si):
2111             return len(list(_get_sharefile(si).get_leases()))
2112
2113         sf0 = _get_sharefile(immutable_si_0)
2114         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2115         sf1 = _get_sharefile(immutable_si_1)
2116         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2117         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2118         sf2 = _get_sharefile(mutable_si_2)
2119         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2120         sf3 = _get_sharefile(mutable_si_3)
2121         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2122         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2123
2124         ss.setServiceParent(self.s)
2125         def _wait():
2126             return bool(lc.get_state()["last-cycle-finished"] is not None)
2127         d = self.poll(_wait)
2128
2129         def _after_first_cycle(ignored):
2130             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2131             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2132             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2133             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2134             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2135             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2136         d.addCallback(_after_first_cycle)
2137         d.addCallback(lambda ign: self.render1(webstatus))
2138         def _check_html(html):
2139             s = remove_tags(html)
2140             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2141         d.addCallback(_check_html)
2142         return d
2143
2144     def test_bad_mode(self):
2145         basedir = "storage/LeaseCrawler/bad_mode"
2146         fileutil.make_dirs(basedir)
2147         e = self.failUnlessRaises(ValueError,
2148                                   StorageServer, basedir, "\x00" * 20,
2149                                   expiration_mode="bogus")
2150         self.failUnless("GC mode 'bogus' must be 'age' or 'cutoff-date'" in str(e), str(e))
2151
2152     def test_parse_duration(self):
2153         DAY = 24*60*60
2154         MONTH = 31*DAY
2155         YEAR = 365*DAY
2156         p = time_format.parse_duration
2157         self.failUnlessEqual(p("7days"), 7*DAY)
2158         self.failUnlessEqual(p("31day"), 31*DAY)
2159         self.failUnlessEqual(p("60 days"), 60*DAY)
2160         self.failUnlessEqual(p("2mo"), 2*MONTH)
2161         self.failUnlessEqual(p("3 month"), 3*MONTH)
2162         self.failUnlessEqual(p("2years"), 2*YEAR)
2163         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2164         self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2165                         in str(e), str(e))
2166
2167     def test_parse_date(self):
2168         p = time_format.parse_date
2169         self.failUnless(isinstance(p("2009-03-18"), int))
2170         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2171
2172     def test_limited_history(self):
2173         basedir = "storage/LeaseCrawler/limited_history"
2174         fileutil.make_dirs(basedir)
2175         ss = StorageServer(basedir, "\x00" * 20)
2176         # make it start sooner than usual.
2177         lc = ss.lease_checker
2178         lc.slow_start = 0
2179         lc.cpu_slice = 500
2180
2181         # create a few shares, with some leases on them
2182         self.make_shares(ss)
2183
2184         ss.setServiceParent(self.s)
2185
2186         def _wait_until_15_cycles_done():
2187             last = lc.state["last-cycle-finished"]
2188             if last is not None and last >= 15:
2189                 return True
2190             if lc.timer:
2191                 lc.timer.reset(0)
2192             return False
2193         d = self.poll(_wait_until_15_cycles_done)
2194
2195         def _check(ignored):
2196             s = lc.get_state()
2197             h = s["history"]
2198             self.failUnlessEqual(len(h), 10)
2199             self.failUnlessEqual(max(h.keys()), 15)
2200             self.failUnlessEqual(min(h.keys()), 6)
2201         d.addCallback(_check)
2202         return d
2203
2204     def test_unpredictable_future(self):
2205         basedir = "storage/LeaseCrawler/unpredictable_future"
2206         fileutil.make_dirs(basedir)
2207         ss = StorageServer(basedir, "\x00" * 20)
2208         # make it start sooner than usual.
2209         lc = ss.lease_checker
2210         lc.slow_start = 0
2211         lc.cpu_slice = -1.0 # stop quickly
2212
2213         self.make_shares(ss)
2214
2215         ss.setServiceParent(self.s)
2216
2217         d = eventual.fireEventually()
2218         def _check(ignored):
2219             # this should fire after the first bucket is complete, but before
2220             # the first prefix is complete, so the progress-measurer won't
2221             # think we've gotten far enough to raise our percent-complete
2222             # above 0%, triggering the cannot-predict-the-future code in
2223             # expirer.py . This will have to change if/when the
2224             # progress-measurer gets smart enough to count buckets (we'll
2225             # have to interrupt it even earlier, before it's finished the
2226             # first bucket).
2227             s = lc.get_state()
2228             self.failUnless("cycle-to-date" in s)
2229             self.failUnless("estimated-remaining-cycle" in s)
2230             self.failUnless("estimated-current-cycle" in s)
2231
2232             left = s["estimated-remaining-cycle"]["space-recovered"]
2233             self.failUnlessEqual(left["actual-buckets"], None)
2234             self.failUnlessEqual(left["original-buckets"], None)
2235             self.failUnlessEqual(left["configured-buckets"], None)
2236             self.failUnlessEqual(left["actual-shares"], None)
2237             self.failUnlessEqual(left["original-shares"], None)
2238             self.failUnlessEqual(left["configured-shares"], None)
2239             self.failUnlessEqual(left["actual-diskbytes"], None)
2240             self.failUnlessEqual(left["original-diskbytes"], None)
2241             self.failUnlessEqual(left["configured-diskbytes"], None)
2242             self.failUnlessEqual(left["actual-sharebytes"], None)
2243             self.failUnlessEqual(left["original-sharebytes"], None)
2244             self.failUnlessEqual(left["configured-sharebytes"], None)
2245
2246             full = s["estimated-remaining-cycle"]["space-recovered"]
2247             self.failUnlessEqual(full["actual-buckets"], None)
2248             self.failUnlessEqual(full["original-buckets"], None)
2249             self.failUnlessEqual(full["configured-buckets"], None)
2250             self.failUnlessEqual(full["actual-shares"], None)
2251             self.failUnlessEqual(full["original-shares"], None)
2252             self.failUnlessEqual(full["configured-shares"], None)
2253             self.failUnlessEqual(full["actual-diskbytes"], None)
2254             self.failUnlessEqual(full["original-diskbytes"], None)
2255             self.failUnlessEqual(full["configured-diskbytes"], None)
2256             self.failUnlessEqual(full["actual-sharebytes"], None)
2257             self.failUnlessEqual(full["original-sharebytes"], None)
2258             self.failUnlessEqual(full["configured-sharebytes"], None)
2259
2260         d.addCallback(_check)
2261         return d
2262
2263     def test_no_st_blocks(self):
2264         basedir = "storage/LeaseCrawler/no_st_blocks"
2265         fileutil.make_dirs(basedir)
2266         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2267                                         expiration_mode="age",
2268                                         expiration_override_lease_duration=-1000)
2269         # a negative expiration_time= means the "configured-"
2270         # space-recovered counts will be non-zero, since all shares will have
2271         # expired by then
2272
2273         # make it start sooner than usual.
2274         lc = ss.lease_checker
2275         lc.slow_start = 0
2276
2277         self.make_shares(ss)
2278         ss.setServiceParent(self.s)
2279         def _wait():
2280             return bool(lc.get_state()["last-cycle-finished"] is not None)
2281         d = self.poll(_wait)
2282
2283         def _check(ignored):
2284             s = lc.get_state()
2285             last = s["history"][0]
2286             rec = last["space-recovered"]
2287             self.failUnlessEqual(rec["configured-buckets"], 4)
2288             self.failUnlessEqual(rec["configured-shares"], 4)
2289             self.failUnless(rec["configured-sharebytes"] > 0,
2290                             rec["configured-sharebytes"])
2291             # without the .st_blocks field in os.stat() results, we should be
2292             # reporting diskbytes==sharebytes
2293             self.failUnlessEqual(rec["configured-sharebytes"],
2294                                  rec["configured-diskbytes"])
2295         d.addCallback(_check)
2296         return d
2297
2298     def test_share_corruption(self):
2299         basedir = "storage/LeaseCrawler/share_corruption"
2300         fileutil.make_dirs(basedir)
2301         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2302         w = StorageStatus(ss)
2303         # make it start sooner than usual.
2304         lc = ss.lease_checker
2305         lc.stop_after_first_bucket = True
2306         lc.slow_start = 0
2307         lc.cpu_slice = 500
2308
2309         # create a few shares, with some leases on them
2310         self.make_shares(ss)
2311
2312         # now corrupt one, and make sure the lease-checker keeps going
2313         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2314         first = min(self.sis)
2315         first_b32 = base32.b2a(first)
2316         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2317         f = open(fn, "rb+")
2318         f.seek(0)
2319         f.write("BAD MAGIC")
2320         f.close()
2321         # if get_share_file() doesn't see the correct mutable magic, it
2322         # assumes the file is an immutable share, and then
2323         # immutable.ShareFile sees a bad version. So regardless of which kind
2324         # of share we corrupted, this will trigger an
2325         # UnknownImmutableContainerVersionError.
2326
2327         # also create an empty bucket
2328         empty_si = base32.b2a("\x04"*16)
2329         empty_bucket_dir = os.path.join(ss.sharedir,
2330                                         storage_index_to_dir(empty_si))
2331         fileutil.make_dirs(empty_bucket_dir)
2332
2333         ss.setServiceParent(self.s)
2334
2335         d = eventual.fireEventually()
2336
2337         # now examine the state right after the first bucket has been
2338         # processed.
2339         def _after_first_bucket(ignored):
2340             so_far = lc.get_state()["cycle-to-date"]
2341             rec = so_far["space-recovered"]
2342             self.failUnlessEqual(rec["examined-buckets"], 1)
2343             self.failUnlessEqual(rec["examined-shares"], 0)
2344             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2345         d.addCallback(_after_first_bucket)
2346
2347         d.addCallback(lambda ign: self.render_json(w))
2348         def _check_json(json):
2349             data = simplejson.loads(json)
2350             # grr. json turns all dict keys into strings.
2351             so_far = data["lease-checker"]["cycle-to-date"]
2352             corrupt_shares = so_far["corrupt-shares"]
2353             # it also turns all tuples into lists
2354             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2355         d.addCallback(_check_json)
2356         d.addCallback(lambda ign: self.render1(w))
2357         def _check_html(html):
2358             s = remove_tags(html)
2359             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2360         d.addCallback(_check_html)
2361
2362         def _wait():
2363             return bool(lc.get_state()["last-cycle-finished"] is not None)
2364         d.addCallback(lambda ign: self.poll(_wait))
2365
2366         def _after_first_cycle(ignored):
2367             s = lc.get_state()
2368             last = s["history"][0]
2369             rec = last["space-recovered"]
2370             self.failUnlessEqual(rec["examined-buckets"], 5)
2371             self.failUnlessEqual(rec["examined-shares"], 3)
2372             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2373         d.addCallback(_after_first_cycle)
2374         d.addCallback(lambda ign: self.render_json(w))
2375         def _check_json_history(json):
2376             data = simplejson.loads(json)
2377             last = data["lease-checker"]["history"]["0"]
2378             corrupt_shares = last["corrupt-shares"]
2379             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2380         d.addCallback(_check_json_history)
2381         d.addCallback(lambda ign: self.render1(w))
2382         def _check_html_history(html):
2383             s = remove_tags(html)
2384             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2385         d.addCallback(_check_html_history)
2386
2387         def _cleanup(res):
2388             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2389                                    UnknownImmutableContainerVersionError)
2390             return res
2391         d.addBoth(_cleanup)
2392         return d
2393
2394     def render_json(self, page):
2395         d = self.render1(page, args={"t": ["json"]})
2396         return d
2397
2398 class NoStatvfsServer(StorageServer):
2399     def do_statvfs(self):
2400         raise AttributeError
2401
2402 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2403
2404     def setUp(self):
2405         self.s = service.MultiService()
2406         self.s.startService()
2407     def tearDown(self):
2408         return self.s.stopService()
2409
2410     def test_no_server(self):
2411         w = StorageStatus(None)
2412         html = w.renderSynchronously()
2413         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2414
2415     def test_status(self):
2416         basedir = "storage/WebStatus/status"
2417         fileutil.make_dirs(basedir)
2418         ss = StorageServer(basedir, "\x00" * 20)
2419         ss.setServiceParent(self.s)
2420         w = StorageStatus(ss)
2421         d = self.render1(w)
2422         def _check_html(html):
2423             self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2424             s = remove_tags(html)
2425             self.failUnless("Accepting new shares: Yes" in s, s)
2426             self.failUnless("Reserved space: - 0 B (0)" in s, s)
2427         d.addCallback(_check_html)
2428         d.addCallback(lambda ign: self.render_json(w))
2429         def _check_json(json):
2430             data = simplejson.loads(json)
2431             s = data["stats"]
2432             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2433             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2434             self.failUnless("bucket-counter" in data)
2435             self.failUnless("lease-checker" in data)
2436         d.addCallback(_check_json)
2437         return d
2438
2439     def render_json(self, page):
2440         d = self.render1(page, args={"t": ["json"]})
2441         return d
2442
2443     def test_status_no_statvfs(self):
2444         # windows has no os.statvfs . Make sure the code handles that even on
2445         # unix.
2446         basedir = "storage/WebStatus/status_no_statvfs"
2447         fileutil.make_dirs(basedir)
2448         ss = NoStatvfsServer(basedir, "\x00" * 20)
2449         ss.setServiceParent(self.s)
2450         w = StorageStatus(ss)
2451         html = w.renderSynchronously()
2452         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2453         s = remove_tags(html)
2454         self.failUnless("Accepting new shares: Yes" in s, s)
2455         self.failUnless("Total disk space: ?" in s, s)
2456
2457     def test_readonly(self):
2458         basedir = "storage/WebStatus/readonly"
2459         fileutil.make_dirs(basedir)
2460         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2461         ss.setServiceParent(self.s)
2462         w = StorageStatus(ss)
2463         html = w.renderSynchronously()
2464         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2465         s = remove_tags(html)
2466         self.failUnless("Accepting new shares: No" in s, s)
2467
2468     def test_reserved(self):
2469         basedir = "storage/WebStatus/reserved"
2470         fileutil.make_dirs(basedir)
2471         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2472         ss.setServiceParent(self.s)
2473         w = StorageStatus(ss)
2474         html = w.renderSynchronously()
2475         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2476         s = remove_tags(html)
2477         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2478
2479     def test_huge_reserved(self):
2480         basedir = "storage/WebStatus/reserved"
2481         fileutil.make_dirs(basedir)
2482         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2483         ss.setServiceParent(self.s)
2484         w = StorageStatus(ss)
2485         html = w.renderSynchronously()
2486         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2487         s = remove_tags(html)
2488         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2489
2490     def test_util(self):
2491         w = StorageStatus(None)
2492         self.failUnlessEqual(w.render_space(None, None), "?")
2493         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2494         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2495         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2496         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2497         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
2498