]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
expirer: include crawler progress in the JSON status output
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re, simplejson, struct
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
26
27 class Marker:
28     pass
29 class FakeCanary:
30     def __init__(self, ignore_disconnectors=False):
31         self.ignore = ignore_disconnectors
32         self.disconnectors = {}
33     def notifyOnDisconnect(self, f, *args, **kwargs):
34         if self.ignore:
35             return
36         m = Marker()
37         self.disconnectors[m] = (f, args, kwargs)
38         return m
39     def dontNotifyOnDisconnect(self, marker):
40         if self.ignore:
41             return
42         del self.disconnectors[marker]
43
44 class FakeStatsProvider:
45     def count(self, name, delta=1):
46         pass
47     def register_producer(self, producer):
48         pass
49
50 class Bucket(unittest.TestCase):
51     def make_workdir(self, name):
52         basedir = os.path.join("storage", "Bucket", name)
53         incoming = os.path.join(basedir, "tmp", "bucket")
54         final = os.path.join(basedir, "bucket")
55         fileutil.make_dirs(basedir)
56         fileutil.make_dirs(os.path.join(basedir, "tmp"))
57         return incoming, final
58
59     def bucket_writer_closed(self, bw, consumed):
60         pass
61     def add_latency(self, category, latency):
62         pass
63     def count(self, name, delta=1):
64         pass
65
66     def make_lease(self):
67         owner_num = 0
68         renew_secret = os.urandom(32)
69         cancel_secret = os.urandom(32)
70         expiration_time = time.time() + 5000
71         return LeaseInfo(owner_num, renew_secret, cancel_secret,
72                          expiration_time, "\x00" * 20)
73
74     def test_create(self):
75         incoming, final = self.make_workdir("test_create")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*25)
81         bw.remote_write(75, "d"*7)
82         bw.remote_close()
83
84     def test_readwrite(self):
85         incoming, final = self.make_workdir("test_readwrite")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*7) # last block may be short
91         bw.remote_close()
92
93         # now read from it
94         br = BucketReader(self, bw.finalhome)
95         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
98
99 class RemoteBucket:
100
101     def callRemote(self, methname, *args, **kwargs):
102         def _call():
103             meth = getattr(self.target, "remote_" + methname)
104             return meth(*args, **kwargs)
105         return defer.maybeDeferred(_call)
106
107 class BucketProxy(unittest.TestCase):
108     def make_bucket(self, name, size):
109         basedir = os.path.join("storage", "BucketProxy", name)
110         incoming = os.path.join(basedir, "tmp", "bucket")
111         final = os.path.join(basedir, "bucket")
112         fileutil.make_dirs(basedir)
113         fileutil.make_dirs(os.path.join(basedir, "tmp"))
114         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
115                           FakeCanary())
116         rb = RemoteBucket()
117         rb.target = bw
118         return bw, rb, final
119
120     def make_lease(self):
121         owner_num = 0
122         renew_secret = os.urandom(32)
123         cancel_secret = os.urandom(32)
124         expiration_time = time.time() + 5000
125         return LeaseInfo(owner_num, renew_secret, cancel_secret,
126                          expiration_time, "\x00" * 20)
127
128     def bucket_writer_closed(self, bw, consumed):
129         pass
130     def add_latency(self, category, latency):
131         pass
132     def count(self, name, delta=1):
133         pass
134
135     def test_create(self):
136         bw, rb, sharefname = self.make_bucket("test_create", 500)
137         bp = WriteBucketProxy(rb,
138                               data_size=300,
139                               block_size=10,
140                               num_segments=5,
141                               num_share_hashes=3,
142                               uri_extension_size_max=500, nodeid=None)
143         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
144
145     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146         # Let's pretend each share has 100 bytes of data, and that there are
147         # 4 segments (25 bytes each), and 8 shares total. So the two
148         # per-segment merkle trees (crypttext_hash_tree,
149         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152         # long. That should make the whole share:
153         #
154         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
156
157         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
158
159         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
160                             for i in range(7)]
161         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
162                         for i in range(7)]
163         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
164                         for i in (1,9,13)]
165         uri_extension = "s" + "E"*498 + "e"
166
167         bw, rb, sharefname = self.make_bucket(name, sharesize)
168         bp = wbp_class(rb,
169                        data_size=95,
170                        block_size=25,
171                        num_segments=4,
172                        num_share_hashes=3,
173                        uri_extension_size_max=len(uri_extension),
174                        nodeid=None)
175
176         d = bp.put_header()
177         d.addCallback(lambda res: bp.put_block(0, "a"*25))
178         d.addCallback(lambda res: bp.put_block(1, "b"*25))
179         d.addCallback(lambda res: bp.put_block(2, "c"*25))
180         d.addCallback(lambda res: bp.put_block(3, "d"*20))
181         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185         d.addCallback(lambda res: bp.close())
186
187         # now read everything back
188         def _start_reading(res):
189             br = BucketReader(self, sharefname)
190             rb = RemoteBucket()
191             rb.target = br
192             rbp = rbp_class(rb, peerid="abc", storage_index="")
193             self.failUnless("to peer" in repr(rbp))
194             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
195
196             d1 = rbp.get_block_data(0, 25, 25)
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
204
205             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, crypttext_hashes))
208             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210             d1.addCallback(lambda res: rbp.get_share_hashes())
211             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212             d1.addCallback(lambda res: rbp.get_uri_extension())
213             d1.addCallback(lambda res:
214                            self.failUnlessEqual(res, uri_extension))
215
216             return d1
217
218         d.addCallback(_start_reading)
219
220         return d
221
222     def test_readwrite_v1(self):
223         return self._do_test_readwrite("test_readwrite_v1",
224                                        0x24, WriteBucketProxy, ReadBucketProxy)
225
226     def test_readwrite_v2(self):
227         return self._do_test_readwrite("test_readwrite_v2",
228                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
229
230 class FakeDiskStorageServer(StorageServer):
231     def stat_disk(self, d):
232         return self.DISKAVAIL
233
234 class Server(unittest.TestCase):
235
236     def setUp(self):
237         self.sparent = LoggingServiceParent()
238         self.sparent.startService()
239         self._lease_secret = itertools.count()
240     def tearDown(self):
241         return self.sparent.stopService()
242
243     def workdir(self, name):
244         basedir = os.path.join("storage", "Server", name)
245         return basedir
246
247     def create(self, name, reserved_space=0, klass=StorageServer):
248         workdir = self.workdir(name)
249         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
250                    stats_provider=FakeStatsProvider())
251         ss.setServiceParent(self.sparent)
252         return ss
253
254     def test_create(self):
255         ss = self.create("test_create")
256
257     def allocate(self, ss, storage_index, sharenums, size, canary=None):
258         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260         if not canary:
261             canary = FakeCanary()
262         return ss.remote_allocate_buckets(storage_index,
263                                           renew_secret, cancel_secret,
264                                           sharenums, size, canary)
265
266     def test_large_share(self):
267         ss = self.create("test_large_share")
268
269         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
270         self.failUnlessEqual(already, set())
271         self.failUnlessEqual(set(writers.keys()), set([0]))
272
273         shnum, bucket = writers.items()[0]
274         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
275         bucket.remote_write(2**32, "ab")
276         bucket.remote_close()
277
278         readers = ss.remote_get_buckets("allocate")
279         reader = readers[shnum]
280         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
281     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
282
283     def test_dont_overfill_dirs(self):
284         """
285         This test asserts that if you add a second share whose storage index
286         share lots of leading bits with an extant share (but isn't the exact
287         same storage index), this won't add an entry to the share directory.
288         """
289         ss = self.create("test_dont_overfill_dirs")
290         already, writers = self.allocate(ss, "storageindex", [0], 10)
291         for i, wb in writers.items():
292             wb.remote_write(0, "%10d" % i)
293             wb.remote_close()
294         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
295                                 "shares")
296         children_of_storedir = set(os.listdir(storedir))
297
298         # Now store another one under another storageindex that has leading
299         # chars the same as the first storageindex.
300         already, writers = self.allocate(ss, "storageindey", [0], 10)
301         for i, wb in writers.items():
302             wb.remote_write(0, "%10d" % i)
303             wb.remote_close()
304         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
305                                 "shares")
306         new_children_of_storedir = set(os.listdir(storedir))
307         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
308
309     def test_remove_incoming(self):
310         ss = self.create("test_remove_incoming")
311         already, writers = self.allocate(ss, "vid", range(3), 10)
312         for i,wb in writers.items():
313             wb.remote_write(0, "%10d" % i)
314             wb.remote_close()
315         incoming_share_dir = wb.incominghome
316         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
317         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
318         incoming_dir = os.path.dirname(incoming_prefix_dir)
319         self.failIf(os.path.exists(incoming_bucket_dir))
320         self.failIf(os.path.exists(incoming_prefix_dir))
321         self.failUnless(os.path.exists(incoming_dir))
322
323     def test_allocate(self):
324         ss = self.create("test_allocate")
325
326         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
327
328         canary = FakeCanary()
329         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330         self.failUnlessEqual(already, set())
331         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
332
333         # while the buckets are open, they should not count as readable
334         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
335
336         # close the buckets
337         for i,wb in writers.items():
338             wb.remote_write(0, "%25d" % i)
339             wb.remote_close()
340             # aborting a bucket that was already closed is a no-op
341             wb.remote_abort()
342
343         # now they should be readable
344         b = ss.remote_get_buckets("allocate")
345         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
347         b_str = str(b[0])
348         self.failUnless("BucketReader" in b_str, b_str)
349         self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
350
351         # now if we ask about writing again, the server should offer those
352         # three buckets as already present. It should offer them even if we
353         # don't ask about those specific ones.
354         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355         self.failUnlessEqual(already, set([0,1,2]))
356         self.failUnlessEqual(set(writers.keys()), set([3,4]))
357
358         # while those two buckets are open for writing, the server should
359         # refuse to offer them to uploaders
360
361         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362         self.failUnlessEqual(already2, set([0,1,2]))
363         self.failUnlessEqual(set(writers2.keys()), set([5]))
364
365         # aborting the writes should remove the tempfiles
366         for i,wb in writers2.items():
367             wb.remote_abort()
368         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369         self.failUnlessEqual(already2, set([0,1,2]))
370         self.failUnlessEqual(set(writers2.keys()), set([5]))
371
372         for i,wb in writers2.items():
373             wb.remote_abort()
374         for i,wb in writers.items():
375             wb.remote_abort()
376
377     def test_bad_container_version(self):
378         ss = self.create("test_bad_container_version")
379         a,w = self.allocate(ss, "si1", [0], 10)
380         w[0].remote_write(0, "\xff"*10)
381         w[0].remote_close()
382
383         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
384         f = open(fn, "rb+")
385         f.seek(0)
386         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
387         f.close()
388
389         b = ss.remote_get_buckets("allocate")
390
391         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392                                   ss.remote_get_buckets, "si1")
393         self.failUnless(" had version 0 but we wanted 1" in str(e), e)
394
395     def test_disconnect(self):
396         # simulate a disconnection
397         ss = self.create("test_disconnect")
398         canary = FakeCanary()
399         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400         self.failUnlessEqual(already, set())
401         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402         for (f,args,kwargs) in canary.disconnectors.values():
403             f(*args, **kwargs)
404         del already
405         del writers
406
407         # that ought to delete the incoming shares
408         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409         self.failUnlessEqual(already, set())
410         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
411
412     def test_reserved_space(self):
413         ss = self.create("test_reserved_space", reserved_space=10000,
414                          klass=FakeDiskStorageServer)
415         # the FakeDiskStorageServer doesn't do real statvfs() calls
416         ss.DISKAVAIL = 15000
417         # 15k available, 10k reserved, leaves 5k for shares
418
419         # a newly created and filled share incurs this much overhead, beyond
420         # the size we request.
421         OVERHEAD = 3*4
422         LEASE_SIZE = 4+32+32+4
423         canary = FakeCanary(True)
424         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425         self.failUnlessEqual(len(writers), 3)
426         # now the StorageServer should have 3000 bytes provisionally
427         # allocated, allowing only 2000 more to be claimed
428         self.failUnlessEqual(len(ss._active_writers), 3)
429
430         # allocating 1001-byte shares only leaves room for one
431         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432         self.failUnlessEqual(len(writers2), 1)
433         self.failUnlessEqual(len(ss._active_writers), 4)
434
435         # we abandon the first set, so their provisional allocation should be
436         # returned
437         del already
438         del writers
439         self.failUnlessEqual(len(ss._active_writers), 1)
440         # now we have a provisional allocation of 1001 bytes
441
442         # and we close the second set, so their provisional allocation should
443         # become real, long-term allocation, and grows to include the
444         # overhead.
445         for bw in writers2.values():
446             bw.remote_write(0, "a"*25)
447             bw.remote_close()
448         del already2
449         del writers2
450         del bw
451         self.failUnlessEqual(len(ss._active_writers), 0)
452
453         allocated = 1001 + OVERHEAD + LEASE_SIZE
454
455         # we have to manually increase DISKAVAIL, since we're not doing real
456         # disk measurements
457         ss.DISKAVAIL -= allocated
458
459         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462         self.failUnlessEqual(len(writers3), 39)
463         self.failUnlessEqual(len(ss._active_writers), 39)
464
465         del already3
466         del writers3
467         self.failUnlessEqual(len(ss._active_writers), 0)
468         ss.disownServiceParent()
469         del ss
470
471     def test_seek(self):
472         basedir = self.workdir("test_seek_behavior")
473         fileutil.make_dirs(basedir)
474         filename = os.path.join(basedir, "testfile")
475         f = open(filename, "wb")
476         f.write("start")
477         f.close()
478         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
479         # files. mode="a" preserves previous contents but does not allow
480         # seeking-to-create-holes. mode="r+" allows both.
481         f = open(filename, "rb+")
482         f.seek(100)
483         f.write("100")
484         f.close()
485         filelen = os.stat(filename)[stat.ST_SIZE]
486         self.failUnlessEqual(filelen, 100+3)
487         f2 = open(filename, "rb")
488         self.failUnlessEqual(f2.read(5), "start")
489
490
491     def test_leases(self):
492         ss = self.create("test_leases")
493         canary = FakeCanary()
494         sharenums = range(5)
495         size = 100
496
497         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
498                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
499         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
500                                                      sharenums, size, canary)
501         self.failUnlessEqual(len(already), 0)
502         self.failUnlessEqual(len(writers), 5)
503         for wb in writers.values():
504             wb.remote_close()
505
506         leases = list(ss.get_leases("si0"))
507         self.failUnlessEqual(len(leases), 1)
508         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
509
510         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
511                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
512         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
513                                                      sharenums, size, canary)
514         for wb in writers.values():
515             wb.remote_close()
516
517         # take out a second lease on si1
518         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
521                                                      sharenums, size, canary)
522         self.failUnlessEqual(len(already), 5)
523         self.failUnlessEqual(len(writers), 0)
524
525         leases = list(ss.get_leases("si1"))
526         self.failUnlessEqual(len(leases), 2)
527         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
528
529         # and a third lease, using add-lease
530         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
531                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
532         ss.remote_add_lease("si1", rs2a, cs2a)
533         leases = list(ss.get_leases("si1"))
534         self.failUnlessEqual(len(leases), 3)
535         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
536
537         # add-lease on a missing storage index is silently ignored
538         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
539
540         # check that si0 is readable
541         readers = ss.remote_get_buckets("si0")
542         self.failUnlessEqual(len(readers), 5)
543
544         # renew the first lease. Only the proper renew_secret should work
545         ss.remote_renew_lease("si0", rs0)
546         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
547         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
548
549         # check that si0 is still readable
550         readers = ss.remote_get_buckets("si0")
551         self.failUnlessEqual(len(readers), 5)
552
553         # now cancel it
554         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
555         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
556         ss.remote_cancel_lease("si0", cs0)
557
558         # si0 should now be gone
559         readers = ss.remote_get_buckets("si0")
560         self.failUnlessEqual(len(readers), 0)
561         # and the renew should no longer work
562         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
563
564
565         # cancel the first lease on si1, leaving the second and third in place
566         ss.remote_cancel_lease("si1", cs1)
567         readers = ss.remote_get_buckets("si1")
568         self.failUnlessEqual(len(readers), 5)
569         # the corresponding renew should no longer work
570         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
571
572         leases = list(ss.get_leases("si1"))
573         self.failUnlessEqual(len(leases), 2)
574         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
575
576         ss.remote_renew_lease("si1", rs2)
577         # cancelling the second and third should make it go away
578         ss.remote_cancel_lease("si1", cs2)
579         ss.remote_cancel_lease("si1", cs2a)
580         readers = ss.remote_get_buckets("si1")
581         self.failUnlessEqual(len(readers), 0)
582         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
583         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
584         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
585
586         leases = list(ss.get_leases("si1"))
587         self.failUnlessEqual(len(leases), 0)
588
589
590         # test overlapping uploads
591         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
592                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
593         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
594                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
595         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
596                                                      sharenums, size, canary)
597         self.failUnlessEqual(len(already), 0)
598         self.failUnlessEqual(len(writers), 5)
599         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
600                                                        sharenums, size, canary)
601         self.failUnlessEqual(len(already2), 0)
602         self.failUnlessEqual(len(writers2), 0)
603         for wb in writers.values():
604             wb.remote_close()
605
606         leases = list(ss.get_leases("si3"))
607         self.failUnlessEqual(len(leases), 1)
608
609         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
610                                                        sharenums, size, canary)
611         self.failUnlessEqual(len(already3), 5)
612         self.failUnlessEqual(len(writers3), 0)
613
614         leases = list(ss.get_leases("si3"))
615         self.failUnlessEqual(len(leases), 2)
616
617     def test_readonly(self):
618         workdir = self.workdir("test_readonly")
619         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
620         ss.setServiceParent(self.sparent)
621
622         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
623         self.failUnlessEqual(already, set())
624         self.failUnlessEqual(writers, {})
625
626         stats = ss.get_stats()
627         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
628                              False)
629         if "storage_server.disk_avail" in stats:
630             # windows does not have os.statvfs, so it doesn't give us disk
631             # stats. But if there are stats, readonly_storage means
632             # disk_avail=0
633             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
634
635     def test_discard(self):
636         # discard is really only used for other tests, but we test it anyways
637         workdir = self.workdir("test_discard")
638         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
639         ss.setServiceParent(self.sparent)
640
641         canary = FakeCanary()
642         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
643         self.failUnlessEqual(already, set())
644         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
645         for i,wb in writers.items():
646             wb.remote_write(0, "%25d" % i)
647             wb.remote_close()
648         # since we discard the data, the shares should be present but sparse.
649         # Since we write with some seeks, the data we read back will be all
650         # zeros.
651         b = ss.remote_get_buckets("vid")
652         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
653         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
654
655     def test_advise_corruption(self):
656         workdir = self.workdir("test_advise_corruption")
657         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
658         ss.setServiceParent(self.sparent)
659
660         si0_s = base32.b2a("si0")
661         ss.remote_advise_corrupt_share("immutable", "si0", 0,
662                                        "This share smells funny.\n")
663         reportdir = os.path.join(workdir, "corruption-advisories")
664         reports = os.listdir(reportdir)
665         self.failUnlessEqual(len(reports), 1)
666         report_si0 = reports[0]
667         self.failUnless(si0_s in report_si0, report_si0)
668         f = open(os.path.join(reportdir, report_si0), "r")
669         report = f.read()
670         f.close()
671         self.failUnless("type: immutable" in report)
672         self.failUnless(("storage_index: %s" % si0_s) in report)
673         self.failUnless("share_number: 0" in report)
674         self.failUnless("This share smells funny." in report)
675
676         # test the RIBucketWriter version too
677         si1_s = base32.b2a("si1")
678         already,writers = self.allocate(ss, "si1", [1], 75)
679         self.failUnlessEqual(already, set())
680         self.failUnlessEqual(set(writers.keys()), set([1]))
681         writers[1].remote_write(0, "data")
682         writers[1].remote_close()
683
684         b = ss.remote_get_buckets("si1")
685         self.failUnlessEqual(set(b.keys()), set([1]))
686         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
687
688         reports = os.listdir(reportdir)
689         self.failUnlessEqual(len(reports), 2)
690         report_si1 = [r for r in reports if si1_s in r][0]
691         f = open(os.path.join(reportdir, report_si1), "r")
692         report = f.read()
693         f.close()
694         self.failUnless("type: immutable" in report)
695         self.failUnless(("storage_index: %s" % si1_s) in report)
696         self.failUnless("share_number: 1" in report)
697         self.failUnless("This share tastes like dust." in report)
698
699
700
701 class MutableServer(unittest.TestCase):
702
703     def setUp(self):
704         self.sparent = LoggingServiceParent()
705         self._lease_secret = itertools.count()
706     def tearDown(self):
707         return self.sparent.stopService()
708
709     def workdir(self, name):
710         basedir = os.path.join("storage", "MutableServer", name)
711         return basedir
712
713     def create(self, name):
714         workdir = self.workdir(name)
715         ss = StorageServer(workdir, "\x00" * 20)
716         ss.setServiceParent(self.sparent)
717         return ss
718
719     def test_create(self):
720         ss = self.create("test_create")
721
722     def write_enabler(self, we_tag):
723         return hashutil.tagged_hash("we_blah", we_tag)
724
725     def renew_secret(self, tag):
726         return hashutil.tagged_hash("renew_blah", str(tag))
727
728     def cancel_secret(self, tag):
729         return hashutil.tagged_hash("cancel_blah", str(tag))
730
731     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
732         write_enabler = self.write_enabler(we_tag)
733         renew_secret = self.renew_secret(lease_tag)
734         cancel_secret = self.cancel_secret(lease_tag)
735         rstaraw = ss.remote_slot_testv_and_readv_and_writev
736         testandwritev = dict( [ (shnum, ([], [], None) )
737                          for shnum in sharenums ] )
738         readv = []
739         rc = rstaraw(storage_index,
740                      (write_enabler, renew_secret, cancel_secret),
741                      testandwritev,
742                      readv)
743         (did_write, readv_data) = rc
744         self.failUnless(did_write)
745         self.failUnless(isinstance(readv_data, dict))
746         self.failUnlessEqual(len(readv_data), 0)
747
748     def test_bad_magic(self):
749         ss = self.create("test_bad_magic")
750         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
751         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
752         f = open(fn, "rb+")
753         f.seek(0)
754         f.write("BAD MAGIC")
755         f.close()
756         read = ss.remote_slot_readv
757         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
758                                   read, "si1", [0], [(0,10)])
759         self.failUnless(" had magic " in str(e), e)
760         self.failUnless(" but we wanted " in str(e), e)
761
762     def test_container_size(self):
763         ss = self.create("test_container_size")
764         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
765                       set([0,1,2]), 100)
766         read = ss.remote_slot_readv
767         rstaraw = ss.remote_slot_testv_and_readv_and_writev
768         secrets = ( self.write_enabler("we1"),
769                     self.renew_secret("we1"),
770                     self.cancel_secret("we1") )
771         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
772         answer = rstaraw("si1", secrets,
773                          {0: ([], [(0,data)], len(data)+12)},
774                          [])
775         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
776
777         # trying to make the container too large will raise an exception
778         TOOBIG = MutableShareFile.MAX_SIZE + 10
779         self.failUnlessRaises(DataTooLargeError,
780                               rstaraw, "si1", secrets,
781                               {0: ([], [(0,data)], TOOBIG)},
782                               [])
783
784         # it should be possible to make the container smaller, although at
785         # the moment this doesn't actually affect the share, unless the
786         # container size is dropped to zero, in which case the share is
787         # deleted.
788         answer = rstaraw("si1", secrets,
789                          {0: ([], [(0,data)], len(data)+8)},
790                          [])
791         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
792
793         answer = rstaraw("si1", secrets,
794                          {0: ([], [(0,data)], 0)},
795                          [])
796         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
797
798         read_answer = read("si1", [0], [(0,10)])
799         self.failUnlessEqual(read_answer, {})
800
801     def test_allocate(self):
802         ss = self.create("test_allocate")
803         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
804                       set([0,1,2]), 100)
805
806         read = ss.remote_slot_readv
807         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
808                              {0: [""]})
809         self.failUnlessEqual(read("si1", [], [(0, 10)]),
810                              {0: [""], 1: [""], 2: [""]})
811         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
812                              {0: [""]})
813
814         # try writing to one
815         secrets = ( self.write_enabler("we1"),
816                     self.renew_secret("we1"),
817                     self.cancel_secret("we1") )
818         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
819         write = ss.remote_slot_testv_and_readv_and_writev
820         answer = write("si1", secrets,
821                        {0: ([], [(0,data)], None)},
822                        [])
823         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
824
825         self.failUnlessEqual(read("si1", [0], [(0,20)]),
826                              {0: ["00000000001111111111"]})
827         self.failUnlessEqual(read("si1", [0], [(95,10)]),
828                              {0: ["99999"]})
829         #self.failUnlessEqual(s0.remote_get_length(), 100)
830
831         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
832         f = self.failUnlessRaises(BadWriteEnablerError,
833                                   write, "si1", bad_secrets,
834                                   {}, [])
835         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
836
837         # this testv should fail
838         answer = write("si1", secrets,
839                        {0: ([(0, 12, "eq", "444444444444"),
840                              (20, 5, "eq", "22222"),
841                              ],
842                             [(0, "x"*100)],
843                             None),
844                         },
845                        [(0,12), (20,5)],
846                        )
847         self.failUnlessEqual(answer, (False,
848                                       {0: ["000000000011", "22222"],
849                                        1: ["", ""],
850                                        2: ["", ""],
851                                        }))
852         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
853
854         # as should this one
855         answer = write("si1", secrets,
856                        {0: ([(10, 5, "lt", "11111"),
857                              ],
858                             [(0, "x"*100)],
859                             None),
860                         },
861                        [(10,5)],
862                        )
863         self.failUnlessEqual(answer, (False,
864                                       {0: ["11111"],
865                                        1: [""],
866                                        2: [""]},
867                                       ))
868         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
869
870
871     def test_operators(self):
872         # test operators, the data we're comparing is '11111' in all cases.
873         # test both fail+pass, reset data after each one.
874         ss = self.create("test_operators")
875
876         secrets = ( self.write_enabler("we1"),
877                     self.renew_secret("we1"),
878                     self.cancel_secret("we1") )
879         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
880         write = ss.remote_slot_testv_and_readv_and_writev
881         read = ss.remote_slot_readv
882
883         def reset():
884             write("si1", secrets,
885                   {0: ([], [(0,data)], None)},
886                   [])
887
888         reset()
889
890         #  lt
891         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
892                                              ],
893                                             [(0, "x"*100)],
894                                             None,
895                                             )}, [(10,5)])
896         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
897         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
898         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
899         reset()
900
901         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
902                                              ],
903                                             [(0, "x"*100)],
904                                             None,
905                                             )}, [(10,5)])
906         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
907         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
908         reset()
909
910         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
911                                              ],
912                                             [(0, "y"*100)],
913                                             None,
914                                             )}, [(10,5)])
915         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
916         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
917         reset()
918
919         #  le
920         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
921                                              ],
922                                             [(0, "x"*100)],
923                                             None,
924                                             )}, [(10,5)])
925         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
926         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
927         reset()
928
929         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
930                                              ],
931                                             [(0, "y"*100)],
932                                             None,
933                                             )}, [(10,5)])
934         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
935         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
936         reset()
937
938         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
939                                              ],
940                                             [(0, "y"*100)],
941                                             None,
942                                             )}, [(10,5)])
943         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
944         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
945         reset()
946
947         #  eq
948         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
949                                              ],
950                                             [(0, "x"*100)],
951                                             None,
952                                             )}, [(10,5)])
953         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
954         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
955         reset()
956
957         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
958                                              ],
959                                             [(0, "y"*100)],
960                                             None,
961                                             )}, [(10,5)])
962         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
963         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
964         reset()
965
966         #  ne
967         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
968                                              ],
969                                             [(0, "x"*100)],
970                                             None,
971                                             )}, [(10,5)])
972         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
973         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
974         reset()
975
976         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
977                                              ],
978                                             [(0, "y"*100)],
979                                             None,
980                                             )}, [(10,5)])
981         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
982         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
983         reset()
984
985         #  ge
986         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
987                                              ],
988                                             [(0, "y"*100)],
989                                             None,
990                                             )}, [(10,5)])
991         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
992         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
993         reset()
994
995         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
996                                              ],
997                                             [(0, "y"*100)],
998                                             None,
999                                             )}, [(10,5)])
1000         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1001         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1002         reset()
1003
1004         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1005                                              ],
1006                                             [(0, "y"*100)],
1007                                             None,
1008                                             )}, [(10,5)])
1009         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1010         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1011         reset()
1012
1013         #  gt
1014         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1015                                              ],
1016                                             [(0, "y"*100)],
1017                                             None,
1018                                             )}, [(10,5)])
1019         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1020         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1021         reset()
1022
1023         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1024                                              ],
1025                                             [(0, "x"*100)],
1026                                             None,
1027                                             )}, [(10,5)])
1028         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1029         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1030         reset()
1031
1032         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1033                                              ],
1034                                             [(0, "x"*100)],
1035                                             None,
1036                                             )}, [(10,5)])
1037         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1038         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1039         reset()
1040
1041         # finally, test some operators against empty shares
1042         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1043                                              ],
1044                                             [(0, "x"*100)],
1045                                             None,
1046                                             )}, [(10,5)])
1047         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1048         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1049         reset()
1050
1051     def test_readv(self):
1052         ss = self.create("test_readv")
1053         secrets = ( self.write_enabler("we1"),
1054                     self.renew_secret("we1"),
1055                     self.cancel_secret("we1") )
1056         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1057         write = ss.remote_slot_testv_and_readv_and_writev
1058         read = ss.remote_slot_readv
1059         data = [("%d" % i) * 100 for i in range(3)]
1060         rc = write("si1", secrets,
1061                    {0: ([], [(0,data[0])], None),
1062                     1: ([], [(0,data[1])], None),
1063                     2: ([], [(0,data[2])], None),
1064                     }, [])
1065         self.failUnlessEqual(rc, (True, {}))
1066
1067         answer = read("si1", [], [(0, 10)])
1068         self.failUnlessEqual(answer, {0: ["0"*10],
1069                                       1: ["1"*10],
1070                                       2: ["2"*10]})
1071
1072     def compare_leases_without_timestamps(self, leases_a, leases_b):
1073         self.failUnlessEqual(len(leases_a), len(leases_b))
1074         for i in range(len(leases_a)):
1075             a = leases_a[i]
1076             b = leases_b[i]
1077             self.failUnlessEqual(a.owner_num,       b.owner_num)
1078             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1079             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1080             self.failUnlessEqual(a.nodeid,          b.nodeid)
1081
1082     def compare_leases(self, leases_a, leases_b):
1083         self.failUnlessEqual(len(leases_a), len(leases_b))
1084         for i in range(len(leases_a)):
1085             a = leases_a[i]
1086             b = leases_b[i]
1087             self.failUnlessEqual(a.owner_num,       b.owner_num)
1088             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1089             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1090             self.failUnlessEqual(a.nodeid,          b.nodeid)
1091             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1092
1093     def test_leases(self):
1094         ss = self.create("test_leases")
1095         def secrets(n):
1096             return ( self.write_enabler("we1"),
1097                      self.renew_secret("we1-%d" % n),
1098                      self.cancel_secret("we1-%d" % n) )
1099         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1100         write = ss.remote_slot_testv_and_readv_and_writev
1101         read = ss.remote_slot_readv
1102         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1103         self.failUnlessEqual(rc, (True, {}))
1104
1105         # create a random non-numeric file in the bucket directory, to
1106         # exercise the code that's supposed to ignore those.
1107         bucket_dir = os.path.join(self.workdir("test_leases"),
1108                                   "shares", storage_index_to_dir("si1"))
1109         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1110         f.write("you ought to be ignoring me\n")
1111         f.close()
1112
1113         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1114         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1115
1116         # add-lease on a missing storage index is silently ignored
1117         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1118
1119         # re-allocate the slots and use the same secrets, that should update
1120         # the lease
1121         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1122         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1123
1124         # renew it directly
1125         ss.remote_renew_lease("si1", secrets(0)[1])
1126         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1127
1128         # now allocate them with a bunch of different secrets, to trigger the
1129         # extended lease code. Use add_lease for one of them.
1130         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1131         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1132         secrets2 = secrets(2)
1133         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1134         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1135         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1136         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1137         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1138
1139         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1140
1141         # cancel one of them
1142         ss.remote_cancel_lease("si1", secrets(5)[2])
1143         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1144
1145         all_leases = list(s0.get_leases())
1146         # and write enough data to expand the container, forcing the server
1147         # to move the leases
1148         write("si1", secrets(0),
1149               {0: ([], [(0,data)], 200), },
1150               [])
1151
1152         # read back the leases, make sure they're still intact.
1153         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1154
1155         ss.remote_renew_lease("si1", secrets(0)[1])
1156         ss.remote_renew_lease("si1", secrets(1)[1])
1157         ss.remote_renew_lease("si1", secrets(2)[1])
1158         ss.remote_renew_lease("si1", secrets(3)[1])
1159         ss.remote_renew_lease("si1", secrets(4)[1])
1160         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1161         # get a new copy of the leases, with the current timestamps. Reading
1162         # data and failing to renew/cancel leases should leave the timestamps
1163         # alone.
1164         all_leases = list(s0.get_leases())
1165         # renewing with a bogus token should prompt an error message
1166
1167         # examine the exception thus raised, make sure the old nodeid is
1168         # present, to provide for share migration
1169         e = self.failUnlessRaises(IndexError,
1170                                   ss.remote_renew_lease, "si1",
1171                                   secrets(20)[1])
1172         e_s = str(e)
1173         self.failUnless("Unable to renew non-existent lease" in e_s)
1174         self.failUnless("I have leases accepted by nodeids:" in e_s)
1175         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1176
1177         # same for cancelling
1178         self.failUnlessRaises(IndexError,
1179                               ss.remote_cancel_lease, "si1",
1180                               secrets(20)[2])
1181         self.compare_leases(all_leases, list(s0.get_leases()))
1182
1183         # reading shares should not modify the timestamp
1184         read("si1", [], [(0,200)])
1185         self.compare_leases(all_leases, list(s0.get_leases()))
1186
1187         write("si1", secrets(0),
1188               {0: ([], [(200, "make me bigger")], None)}, [])
1189         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1190
1191         write("si1", secrets(0),
1192               {0: ([], [(500, "make me really bigger")], None)}, [])
1193         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1194
1195         # now cancel them all
1196         ss.remote_cancel_lease("si1", secrets(0)[2])
1197         ss.remote_cancel_lease("si1", secrets(1)[2])
1198         ss.remote_cancel_lease("si1", secrets(2)[2])
1199         ss.remote_cancel_lease("si1", secrets(3)[2])
1200
1201         # the slot should still be there
1202         remaining_shares = read("si1", [], [(0,10)])
1203         self.failUnlessEqual(len(remaining_shares), 1)
1204         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1205
1206         # cancelling a non-existent lease should raise an IndexError
1207         self.failUnlessRaises(IndexError,
1208                               ss.remote_cancel_lease, "si1", "nonsecret")
1209
1210         # and the slot should still be there
1211         remaining_shares = read("si1", [], [(0,10)])
1212         self.failUnlessEqual(len(remaining_shares), 1)
1213         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1214
1215         ss.remote_cancel_lease("si1", secrets(4)[2])
1216         # now the slot should be gone
1217         no_shares = read("si1", [], [(0,10)])
1218         self.failUnlessEqual(no_shares, {})
1219
1220         # cancelling a lease on a non-existent share should raise an IndexError
1221         self.failUnlessRaises(IndexError,
1222                               ss.remote_cancel_lease, "si2", "nonsecret")
1223
1224     def test_remove(self):
1225         ss = self.create("test_remove")
1226         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1227                       set([0,1,2]), 100)
1228         readv = ss.remote_slot_readv
1229         writev = ss.remote_slot_testv_and_readv_and_writev
1230         secrets = ( self.write_enabler("we1"),
1231                     self.renew_secret("we1"),
1232                     self.cancel_secret("we1") )
1233         # delete sh0 by setting its size to zero
1234         answer = writev("si1", secrets,
1235                         {0: ([], [], 0)},
1236                         [])
1237         # the answer should mention all the shares that existed before the
1238         # write
1239         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1240         # but a new read should show only sh1 and sh2
1241         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1242                              {1: [""], 2: [""]})
1243
1244         # delete sh1 by setting its size to zero
1245         answer = writev("si1", secrets,
1246                         {1: ([], [], 0)},
1247                         [])
1248         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1249         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1250                              {2: [""]})
1251
1252         # delete sh2 by setting its size to zero
1253         answer = writev("si1", secrets,
1254                         {2: ([], [], 0)},
1255                         [])
1256         self.failUnlessEqual(answer, (True, {2:[]}) )
1257         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1258                              {})
1259         # and the bucket directory should now be gone
1260         si = base32.b2a("si1")
1261         # note: this is a detail of the storage server implementation, and
1262         # may change in the future
1263         prefix = si[:2]
1264         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1265         bucketdir = os.path.join(prefixdir, si)
1266         self.failUnless(os.path.exists(prefixdir))
1267         self.failIf(os.path.exists(bucketdir))
1268
1269 class Stats(unittest.TestCase):
1270
1271     def setUp(self):
1272         self.sparent = LoggingServiceParent()
1273         self._lease_secret = itertools.count()
1274     def tearDown(self):
1275         return self.sparent.stopService()
1276
1277     def workdir(self, name):
1278         basedir = os.path.join("storage", "Server", name)
1279         return basedir
1280
1281     def create(self, name):
1282         workdir = self.workdir(name)
1283         ss = StorageServer(workdir, "\x00" * 20)
1284         ss.setServiceParent(self.sparent)
1285         return ss
1286
1287     def test_latencies(self):
1288         ss = self.create("test_latencies")
1289         for i in range(10000):
1290             ss.add_latency("allocate", 1.0 * i)
1291         for i in range(1000):
1292             ss.add_latency("renew", 1.0 * i)
1293         for i in range(10):
1294             ss.add_latency("cancel", 2.0 * i)
1295         ss.add_latency("get", 5.0)
1296
1297         output = ss.get_latencies()
1298
1299         self.failUnlessEqual(sorted(output.keys()),
1300                              sorted(["allocate", "renew", "cancel", "get"]))
1301         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1302         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1303         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1304         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1305         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1306         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1307         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1308         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1309         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1310
1311         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1312         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1313         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1314         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1315         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1316         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1317         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1318         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1319         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1320
1321         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1322         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1323         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1324         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1325         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1326         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1327         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1328         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1329         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1330
1331         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1332         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1333         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1334         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1335         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1336         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1337         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1338         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1339         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1340
1341 def remove_tags(s):
1342     s = re.sub(r'<[^>]*>', ' ', s)
1343     s = re.sub(r'\s+', ' ', s)
1344     return s
1345
1346 class MyBucketCountingCrawler(BucketCountingCrawler):
1347     def finished_prefix(self, cycle, prefix):
1348         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1349         if self.hook_ds:
1350             d = self.hook_ds.pop(0)
1351             d.callback(None)
1352
1353 class MyStorageServer(StorageServer):
1354     def add_bucket_counter(self):
1355         statefile = os.path.join(self.storedir, "bucket_counter.state")
1356         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1357         self.bucket_counter.setServiceParent(self)
1358
1359 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1360
1361     def setUp(self):
1362         self.s = service.MultiService()
1363         self.s.startService()
1364     def tearDown(self):
1365         return self.s.stopService()
1366
1367     def test_bucket_counter(self):
1368         basedir = "storage/BucketCounter/bucket_counter"
1369         fileutil.make_dirs(basedir)
1370         ss = StorageServer(basedir, "\x00" * 20)
1371         # to make sure we capture the bucket-counting-crawler in the middle
1372         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1373         # also make it start sooner than usual.
1374         ss.bucket_counter.slow_start = 0
1375         orig_cpu_slice = ss.bucket_counter.cpu_slice
1376         ss.bucket_counter.cpu_slice = 0
1377         ss.setServiceParent(self.s)
1378
1379         w = StorageStatus(ss)
1380
1381         # this sample is before the crawler has started doing anything
1382         html = w.renderSynchronously()
1383         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1384         s = remove_tags(html)
1385         self.failUnless("Accepting new shares: Yes" in s, s)
1386         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1387         self.failUnless("Total buckets: Not computed yet" in s, s)
1388         self.failUnless("Next crawl in" in s, s)
1389
1390         # give the bucket-counting-crawler one tick to get started. The
1391         # cpu_slice=0 will force it to yield right after it processes the
1392         # first prefix
1393
1394         d = eventual.fireEventually()
1395         def _check(ignored):
1396             # are we really right after the first prefix?
1397             state = ss.bucket_counter.get_state()
1398             self.failUnlessEqual(state["last-complete-prefix"],
1399                                  ss.bucket_counter.prefixes[0])
1400             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1401             html = w.renderSynchronously()
1402             s = remove_tags(html)
1403             self.failUnless(" Current crawl " in s, s)
1404             self.failUnless(" (next work in " in s, s)
1405         d.addCallback(_check)
1406
1407         # now give it enough time to complete a full cycle
1408         def _watch():
1409             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1410         d.addCallback(lambda ignored: self.poll(_watch))
1411         def _check2(ignored):
1412             ss.bucket_counter.cpu_slice = orig_cpu_slice
1413             html = w.renderSynchronously()
1414             s = remove_tags(html)
1415             self.failUnless("Total buckets: 0 (the number of" in s, s)
1416             self.failUnless("Next crawl in 59 minutes" in s, s)
1417         d.addCallback(_check2)
1418         return d
1419
1420     def test_bucket_counter_cleanup(self):
1421         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1422         fileutil.make_dirs(basedir)
1423         ss = StorageServer(basedir, "\x00" * 20)
1424         # to make sure we capture the bucket-counting-crawler in the middle
1425         # of a cycle, we reach in and reduce its maximum slice time to 0.
1426         ss.bucket_counter.slow_start = 0
1427         orig_cpu_slice = ss.bucket_counter.cpu_slice
1428         ss.bucket_counter.cpu_slice = 0
1429         ss.setServiceParent(self.s)
1430
1431         d = eventual.fireEventually()
1432
1433         def _after_first_prefix(ignored):
1434             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1435             # now sneak in and mess with its state, to make sure it cleans up
1436             # properly at the end of the cycle
1437             state = ss.bucket_counter.state
1438             self.failUnlessEqual(state["last-complete-prefix"],
1439                                  ss.bucket_counter.prefixes[0])
1440             state["bucket-counts"][-12] = {}
1441             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1442             ss.bucket_counter.save_state()
1443         d.addCallback(_after_first_prefix)
1444
1445         # now give it enough time to complete a cycle
1446         def _watch():
1447             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1448         d.addCallback(lambda ignored: self.poll(_watch))
1449         def _check2(ignored):
1450             ss.bucket_counter.cpu_slice = orig_cpu_slice
1451             s = ss.bucket_counter.get_state()
1452             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1453             self.failIf("bogusprefix!" in s["storage-index-samples"],
1454                         s["storage-index-samples"].keys())
1455         d.addCallback(_check2)
1456         return d
1457
1458     def test_bucket_counter_eta(self):
1459         basedir = "storage/BucketCounter/bucket_counter_eta"
1460         fileutil.make_dirs(basedir)
1461         ss = MyStorageServer(basedir, "\x00" * 20)
1462         ss.bucket_counter.slow_start = 0
1463         # these will be fired inside finished_prefix()
1464         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1465         w = StorageStatus(ss)
1466
1467         d = defer.Deferred()
1468
1469         def _check_1(ignored):
1470             # no ETA is available yet
1471             html = w.renderSynchronously()
1472             s = remove_tags(html)
1473             self.failUnlessIn("complete (next work", s)
1474
1475         def _check_2(ignored):
1476             # one prefix has finished, so an ETA based upon that elapsed time
1477             # should be available.
1478             html = w.renderSynchronously()
1479             s = remove_tags(html)
1480             self.failUnlessIn("complete (ETA ", s)
1481
1482         def _check_3(ignored):
1483             # two prefixes have finished
1484             html = w.renderSynchronously()
1485             s = remove_tags(html)
1486             self.failUnlessIn("complete (ETA ", s)
1487             d.callback("done")
1488
1489         hooks[0].addCallback(_check_1).addErrback(d.errback)
1490         hooks[1].addCallback(_check_2).addErrback(d.errback)
1491         hooks[2].addCallback(_check_3).addErrback(d.errback)
1492
1493         ss.setServiceParent(self.s)
1494         return d
1495
1496 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1497     stop_after_first_bucket = False
1498     def process_bucket(self, *args, **kwargs):
1499         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1500         if self.stop_after_first_bucket:
1501             self.stop_after_first_bucket = False
1502             self.cpu_slice = -1.0
1503     def yielding(self, sleep_time):
1504         if not self.stop_after_first_bucket:
1505             self.cpu_slice = 500
1506
1507 class BrokenStatResults:
1508     pass
1509 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1510     def stat(self, fn):
1511         s = os.stat(fn)
1512         bsr = BrokenStatResults()
1513         for attrname in dir(s):
1514             if attrname.startswith("_"):
1515                 continue
1516             if attrname == "st_blocks":
1517                 continue
1518             setattr(bsr, attrname, getattr(s, attrname))
1519         return bsr
1520
1521 class InstrumentedStorageServer(StorageServer):
1522     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1523 class No_ST_BLOCKS_StorageServer(StorageServer):
1524     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1525
1526 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1527
1528     def setUp(self):
1529         self.s = service.MultiService()
1530         self.s.startService()
1531     def tearDown(self):
1532         return self.s.stopService()
1533
1534     def make_shares(self, ss):
1535         def make(si):
1536             return (si, hashutil.tagged_hash("renew", si),
1537                     hashutil.tagged_hash("cancel", si))
1538         def make_mutable(si):
1539             return (si, hashutil.tagged_hash("renew", si),
1540                     hashutil.tagged_hash("cancel", si),
1541                     hashutil.tagged_hash("write-enabler", si))
1542         def make_extra_lease(si, num):
1543             return (hashutil.tagged_hash("renew-%d" % num, si),
1544                     hashutil.tagged_hash("cancel-%d" % num, si))
1545
1546         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1547         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1548         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1549         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1550         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1551         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1552         sharenums = [0]
1553         canary = FakeCanary()
1554         # note: 'tahoe debug dump-share' will not handle this file, since the
1555         # inner contents are not a valid CHK share
1556         data = "\xff" * 1000
1557
1558         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1559                                          1000, canary)
1560         w[0].remote_write(0, data)
1561         w[0].remote_close()
1562
1563         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1564                                          1000, canary)
1565         w[0].remote_write(0, data)
1566         w[0].remote_close()
1567         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1568
1569         writev = ss.remote_slot_testv_and_readv_and_writev
1570         writev(mutable_si_2, (we2, rs2, cs2),
1571                {0: ([], [(0,data)], len(data))}, [])
1572         writev(mutable_si_3, (we3, rs3, cs3),
1573                {0: ([], [(0,data)], len(data))}, [])
1574         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1575
1576         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1577         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1578         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1579
1580     def test_basic(self):
1581         basedir = "storage/LeaseCrawler/basic"
1582         fileutil.make_dirs(basedir)
1583         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1584         # make it start sooner than usual.
1585         lc = ss.lease_checker
1586         lc.slow_start = 0
1587         lc.cpu_slice = 500
1588         lc.stop_after_first_bucket = True
1589         webstatus = StorageStatus(ss)
1590
1591         # create a few shares, with some leases on them
1592         self.make_shares(ss)
1593         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1594
1595         # add a non-sharefile to exercise another code path
1596         fn = os.path.join(ss.sharedir,
1597                           storage_index_to_dir(immutable_si_0),
1598                           "not-a-share")
1599         f = open(fn, "wb")
1600         f.write("I am not a share.\n")
1601         f.close()
1602
1603         # this is before the crawl has started, so we're not in a cycle yet
1604         initial_state = lc.get_state()
1605         self.failIf(lc.get_progress()["cycle-in-progress"])
1606         self.failIf("cycle-to-date" in initial_state)
1607         self.failIf("estimated-remaining-cycle" in initial_state)
1608         self.failIf("estimated-current-cycle" in initial_state)
1609         self.failUnless("history" in initial_state)
1610         self.failUnlessEqual(initial_state["history"], {})
1611
1612         ss.setServiceParent(self.s)
1613
1614         DAY = 24*60*60
1615
1616         d = eventual.fireEventually()
1617
1618         # now examine the state right after the first bucket has been
1619         # processed.
1620         def _after_first_bucket(ignored):
1621             initial_state = lc.get_state()
1622             self.failUnless("cycle-to-date" in initial_state)
1623             self.failUnless("estimated-remaining-cycle" in initial_state)
1624             self.failUnless("estimated-current-cycle" in initial_state)
1625             self.failUnless("history" in initial_state)
1626             self.failUnlessEqual(initial_state["history"], {})
1627
1628             so_far = initial_state["cycle-to-date"]
1629             self.failUnlessEqual(so_far["expiration-enabled"], False)
1630             self.failUnless("configured-expiration-mode" in so_far)
1631             self.failUnless("lease-age-histogram" in so_far)
1632             lah = so_far["lease-age-histogram"]
1633             self.failUnlessEqual(type(lah), list)
1634             self.failUnlessEqual(len(lah), 1)
1635             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1636             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1637             self.failUnlessEqual(so_far["corrupt-shares"], [])
1638             sr1 = so_far["space-recovered"]
1639             self.failUnlessEqual(sr1["examined-buckets"], 1)
1640             self.failUnlessEqual(sr1["examined-shares"], 1)
1641             self.failUnlessEqual(sr1["actual-shares"], 0)
1642             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1643             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1644             left = initial_state["estimated-remaining-cycle"]
1645             sr2 = left["space-recovered"]
1646             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1647             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1648             self.failIfEqual(sr2["actual-shares"], None)
1649             self.failIfEqual(sr2["configured-diskbytes"], None)
1650             self.failIfEqual(sr2["original-sharebytes"], None)
1651         d.addCallback(_after_first_bucket)
1652         d.addCallback(lambda ign: self.render1(webstatus))
1653         def _check_html_in_cycle(html):
1654             s = remove_tags(html)
1655             self.failUnlessIn("So far, this cycle has examined "
1656                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1657             self.failUnlessIn("and has recovered: "
1658                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1659                               "0 B (0 B / 0 B)", s)
1660             self.failUnlessIn("If expiration were enabled, "
1661                               "we would have recovered: "
1662                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1663                               " 0 B (0 B / 0 B) by now", s)
1664             self.failUnlessIn("and the remainder of this cycle "
1665                               "would probably recover: "
1666                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1667                               " 0 B (0 B / 0 B)", s)
1668             self.failUnlessIn("and the whole cycle would probably recover: "
1669                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1670                               " 0 B (0 B / 0 B)", s)
1671             self.failUnlessIn("if we were strictly using each lease's default "
1672                               "31-day lease lifetime", s)
1673             self.failUnlessIn("this cycle would be expected to recover: ", s)
1674         d.addCallback(_check_html_in_cycle)
1675
1676         # wait for the crawler to finish the first cycle. Nothing should have
1677         # been removed.
1678         def _wait():
1679             return bool(lc.get_state()["last-cycle-finished"] is not None)
1680         d.addCallback(lambda ign: self.poll(_wait))
1681
1682         def _after_first_cycle(ignored):
1683             s = lc.get_state()
1684             self.failIf("cycle-to-date" in s)
1685             self.failIf("estimated-remaining-cycle" in s)
1686             self.failIf("estimated-current-cycle" in s)
1687             last = s["history"][0]
1688             self.failUnless("cycle-start-finish-times" in last)
1689             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1690             self.failUnlessEqual(last["expiration-enabled"], False)
1691             self.failUnless("configured-expiration-mode" in last)
1692
1693             self.failUnless("lease-age-histogram" in last)
1694             lah = last["lease-age-histogram"]
1695             self.failUnlessEqual(type(lah), list)
1696             self.failUnlessEqual(len(lah), 1)
1697             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1698
1699             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1700             self.failUnlessEqual(last["corrupt-shares"], [])
1701
1702             rec = last["space-recovered"]
1703             self.failUnlessEqual(rec["examined-buckets"], 4)
1704             self.failUnlessEqual(rec["examined-shares"], 4)
1705             self.failUnlessEqual(rec["actual-buckets"], 0)
1706             self.failUnlessEqual(rec["original-buckets"], 0)
1707             self.failUnlessEqual(rec["configured-buckets"], 0)
1708             self.failUnlessEqual(rec["actual-shares"], 0)
1709             self.failUnlessEqual(rec["original-shares"], 0)
1710             self.failUnlessEqual(rec["configured-shares"], 0)
1711             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1712             self.failUnlessEqual(rec["original-diskbytes"], 0)
1713             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1714             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1715             self.failUnlessEqual(rec["original-sharebytes"], 0)
1716             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1717
1718             def _get_sharefile(si):
1719                 return list(ss._iter_share_files(si))[0]
1720             def count_leases(si):
1721                 return len(list(_get_sharefile(si).get_leases()))
1722             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1723             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1724             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1725             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1726         d.addCallback(_after_first_cycle)
1727         d.addCallback(lambda ign: self.render1(webstatus))
1728         def _check_html(html):
1729             s = remove_tags(html)
1730             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1731                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
1732                               "but expiration was not enabled", s)
1733         d.addCallback(_check_html)
1734         d.addCallback(lambda ign: self.render_json(webstatus))
1735         def _check_json(json):
1736             data = simplejson.loads(json)
1737             self.failUnless("lease-checker" in data)
1738             self.failUnless("lease-checker-progress" in data)
1739         d.addCallback(_check_json)
1740         return d
1741
1742     def backdate_lease(self, sf, renew_secret, new_expire_time):
1743         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1744         # "renew" a lease with a new_expire_time that is older than what the
1745         # current lease has), so we have to reach inside it.
1746         for i,lease in enumerate(sf.get_leases()):
1747             if lease.renew_secret == renew_secret:
1748                 lease.expiration_time = new_expire_time
1749                 f = open(sf.home, 'rb+')
1750                 sf._write_lease_record(f, i, lease)
1751                 f.close()
1752                 return
1753         raise IndexError("unable to renew non-existent lease")
1754
1755     def test_expire_age(self):
1756         basedir = "storage/LeaseCrawler/expire_age"
1757         fileutil.make_dirs(basedir)
1758         # setting expiration_time to 2000 means that any lease which is more
1759         # than 2000s old will be expired.
1760         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1761                                        expiration_enabled=True,
1762                                        expiration_mode="age",
1763                                        expiration_override_lease_duration=2000)
1764         # make it start sooner than usual.
1765         lc = ss.lease_checker
1766         lc.slow_start = 0
1767         lc.stop_after_first_bucket = True
1768         webstatus = StorageStatus(ss)
1769
1770         # create a few shares, with some leases on them
1771         self.make_shares(ss)
1772         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1773
1774         def count_shares(si):
1775             return len(list(ss._iter_share_files(si)))
1776         def _get_sharefile(si):
1777             return list(ss._iter_share_files(si))[0]
1778         def count_leases(si):
1779             return len(list(_get_sharefile(si).get_leases()))
1780
1781         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1782         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1783         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1784         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1785         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1786         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1787         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1788         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1789
1790         # artificially crank back the expiration time on the first lease of
1791         # each share, to make it look like it expired already (age=1000s).
1792         # Some shares have an extra lease which is set to expire at the
1793         # default time in 31 days from now (age=31days). We then run the
1794         # crawler, which will expire the first lease, making some shares get
1795         # deleted and others stay alive (with one remaining lease)
1796         now = time.time()
1797
1798         sf0 = _get_sharefile(immutable_si_0)
1799         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1800         sf0_size = os.stat(sf0.home).st_size
1801
1802         # immutable_si_1 gets an extra lease
1803         sf1 = _get_sharefile(immutable_si_1)
1804         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1805
1806         sf2 = _get_sharefile(mutable_si_2)
1807         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1808         sf2_size = os.stat(sf2.home).st_size
1809
1810         # mutable_si_3 gets an extra lease
1811         sf3 = _get_sharefile(mutable_si_3)
1812         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1813
1814         ss.setServiceParent(self.s)
1815
1816         d = eventual.fireEventually()
1817         # examine the state right after the first bucket has been processed
1818         def _after_first_bucket(ignored):
1819             p = lc.get_progress()
1820             self.failUnless(p["cycle-in-progress"])
1821         d.addCallback(_after_first_bucket)
1822         d.addCallback(lambda ign: self.render1(webstatus))
1823         def _check_html_in_cycle(html):
1824             s = remove_tags(html)
1825             # the first bucket encountered gets deleted, and its prefix
1826             # happens to be about 1/5th of the way through the ring, so the
1827             # predictor thinks we'll have 5 shares and that we'll delete them
1828             # all. This part of the test depends upon the SIs landing right
1829             # where they do now.
1830             self.failUnlessIn("The remainder of this cycle is expected to "
1831                               "recover: 4 shares, 4 buckets", s)
1832             self.failUnlessIn("The whole cycle is expected to examine "
1833                               "5 shares in 5 buckets and to recover: "
1834                               "5 shares, 5 buckets", s)
1835         d.addCallback(_check_html_in_cycle)
1836
1837         # wait for the crawler to finish the first cycle. Two shares should
1838         # have been removed
1839         def _wait():
1840             return bool(lc.get_state()["last-cycle-finished"] is not None)
1841         d.addCallback(lambda ign: self.poll(_wait))
1842
1843         def _after_first_cycle(ignored):
1844             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1845             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1846             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1847             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1848             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1849             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1850
1851             s = lc.get_state()
1852             last = s["history"][0]
1853
1854             self.failUnlessEqual(last["expiration-enabled"], True)
1855             self.failUnlessEqual(last["configured-expiration-mode"],
1856                                  ("age", 2000, None, ("mutable", "immutable")))
1857             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1858
1859             rec = last["space-recovered"]
1860             self.failUnlessEqual(rec["examined-buckets"], 4)
1861             self.failUnlessEqual(rec["examined-shares"], 4)
1862             self.failUnlessEqual(rec["actual-buckets"], 2)
1863             self.failUnlessEqual(rec["original-buckets"], 2)
1864             self.failUnlessEqual(rec["configured-buckets"], 2)
1865             self.failUnlessEqual(rec["actual-shares"], 2)
1866             self.failUnlessEqual(rec["original-shares"], 2)
1867             self.failUnlessEqual(rec["configured-shares"], 2)
1868             size = sf0_size + sf2_size
1869             self.failUnlessEqual(rec["actual-sharebytes"], size)
1870             self.failUnlessEqual(rec["original-sharebytes"], size)
1871             self.failUnlessEqual(rec["configured-sharebytes"], size)
1872             # different platforms have different notions of "blocks used by
1873             # this file", so merely assert that it's a number
1874             self.failUnless(rec["actual-diskbytes"] >= 0,
1875                             rec["actual-diskbytes"])
1876             self.failUnless(rec["original-diskbytes"] >= 0,
1877                             rec["original-diskbytes"])
1878             self.failUnless(rec["configured-diskbytes"] >= 0,
1879                             rec["configured-diskbytes"])
1880         d.addCallback(_after_first_cycle)
1881         d.addCallback(lambda ign: self.render1(webstatus))
1882         def _check_html(html):
1883             s = remove_tags(html)
1884             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1885             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1886             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1887         d.addCallback(_check_html)
1888         return d
1889
1890     def test_expire_cutoff_date(self):
1891         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1892         fileutil.make_dirs(basedir)
1893         # setting cutoff-date to 2000 seconds ago means that any lease which
1894         # is more than 2000s old will be expired.
1895         now = time.time()
1896         then = int(now - 2000)
1897         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1898                                        expiration_enabled=True,
1899                                        expiration_mode="cutoff-date",
1900                                        expiration_cutoff_date=then)
1901         # make it start sooner than usual.
1902         lc = ss.lease_checker
1903         lc.slow_start = 0
1904         lc.stop_after_first_bucket = True
1905         webstatus = StorageStatus(ss)
1906
1907         # create a few shares, with some leases on them
1908         self.make_shares(ss)
1909         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1910
1911         def count_shares(si):
1912             return len(list(ss._iter_share_files(si)))
1913         def _get_sharefile(si):
1914             return list(ss._iter_share_files(si))[0]
1915         def count_leases(si):
1916             return len(list(_get_sharefile(si).get_leases()))
1917
1918         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1919         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1920         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1921         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1922         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1923         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1924         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1925         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1926
1927         # artificially crank back the expiration time on the first lease of
1928         # each share, to make it look like was renewed 3000s ago. To achieve
1929         # this, we need to set the expiration time to now-3000+31days. This
1930         # will change when the lease format is improved to contain both
1931         # create/renew time and duration.
1932         new_expiration_time = now - 3000 + 31*24*60*60
1933
1934         # Some shares have an extra lease which is set to expire at the
1935         # default time in 31 days from now (age=31days). We then run the
1936         # crawler, which will expire the first lease, making some shares get
1937         # deleted and others stay alive (with one remaining lease)
1938
1939         sf0 = _get_sharefile(immutable_si_0)
1940         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1941         sf0_size = os.stat(sf0.home).st_size
1942
1943         # immutable_si_1 gets an extra lease
1944         sf1 = _get_sharefile(immutable_si_1)
1945         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1946
1947         sf2 = _get_sharefile(mutable_si_2)
1948         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1949         sf2_size = os.stat(sf2.home).st_size
1950
1951         # mutable_si_3 gets an extra lease
1952         sf3 = _get_sharefile(mutable_si_3)
1953         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1954
1955         ss.setServiceParent(self.s)
1956
1957         d = eventual.fireEventually()
1958         # examine the state right after the first bucket has been processed
1959         def _after_first_bucket(ignored):
1960             p = lc.get_progress()
1961             self.failUnless(p["cycle-in-progress"])
1962         d.addCallback(_after_first_bucket)
1963         d.addCallback(lambda ign: self.render1(webstatus))
1964         def _check_html_in_cycle(html):
1965             s = remove_tags(html)
1966             # the first bucket encountered gets deleted, and its prefix
1967             # happens to be about 1/5th of the way through the ring, so the
1968             # predictor thinks we'll have 5 shares and that we'll delete them
1969             # all. This part of the test depends upon the SIs landing right
1970             # where they do now.
1971             self.failUnlessIn("The remainder of this cycle is expected to "
1972                               "recover: 4 shares, 4 buckets", s)
1973             self.failUnlessIn("The whole cycle is expected to examine "
1974                               "5 shares in 5 buckets and to recover: "
1975                               "5 shares, 5 buckets", s)
1976         d.addCallback(_check_html_in_cycle)
1977
1978         # wait for the crawler to finish the first cycle. Two shares should
1979         # have been removed
1980         def _wait():
1981             return bool(lc.get_state()["last-cycle-finished"] is not None)
1982         d.addCallback(lambda ign: self.poll(_wait))
1983
1984         def _after_first_cycle(ignored):
1985             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1986             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1987             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1988             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1989             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1990             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1991
1992             s = lc.get_state()
1993             last = s["history"][0]
1994
1995             self.failUnlessEqual(last["expiration-enabled"], True)
1996             self.failUnlessEqual(last["configured-expiration-mode"],
1997                                  ("cutoff-date", None, then,
1998                                   ("mutable", "immutable")))
1999             self.failUnlessEqual(last["leases-per-share-histogram"],
2000                                  {1: 2, 2: 2})
2001
2002             rec = last["space-recovered"]
2003             self.failUnlessEqual(rec["examined-buckets"], 4)
2004             self.failUnlessEqual(rec["examined-shares"], 4)
2005             self.failUnlessEqual(rec["actual-buckets"], 2)
2006             self.failUnlessEqual(rec["original-buckets"], 0)
2007             self.failUnlessEqual(rec["configured-buckets"], 2)
2008             self.failUnlessEqual(rec["actual-shares"], 2)
2009             self.failUnlessEqual(rec["original-shares"], 0)
2010             self.failUnlessEqual(rec["configured-shares"], 2)
2011             size = sf0_size + sf2_size
2012             self.failUnlessEqual(rec["actual-sharebytes"], size)
2013             self.failUnlessEqual(rec["original-sharebytes"], 0)
2014             self.failUnlessEqual(rec["configured-sharebytes"], size)
2015             # different platforms have different notions of "blocks used by
2016             # this file", so merely assert that it's a number
2017             self.failUnless(rec["actual-diskbytes"] >= 0,
2018                             rec["actual-diskbytes"])
2019             self.failUnless(rec["original-diskbytes"] >= 0,
2020                             rec["original-diskbytes"])
2021             self.failUnless(rec["configured-diskbytes"] >= 0,
2022                             rec["configured-diskbytes"])
2023         d.addCallback(_after_first_cycle)
2024         d.addCallback(lambda ign: self.render1(webstatus))
2025         def _check_html(html):
2026             s = remove_tags(html)
2027             self.failUnlessIn("Expiration Enabled:"
2028                               " expired leases will be removed", s)
2029             date = time.strftime("%d-%b-%Y", time.gmtime(then))
2030             self.failUnlessIn("Leases created or last renewed before %s"
2031                               " will be considered expired." % date, s)
2032             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2033         d.addCallback(_check_html)
2034         return d
2035
2036     def test_only_immutable(self):
2037         basedir = "storage/LeaseCrawler/only_immutable"
2038         fileutil.make_dirs(basedir)
2039         now = time.time()
2040         then = int(now - 2000)
2041         ss = StorageServer(basedir, "\x00" * 20,
2042                            expiration_enabled=True,
2043                            expiration_mode="cutoff-date",
2044                            expiration_cutoff_date=then,
2045                            expiration_sharetypes=("immutable",))
2046         lc = ss.lease_checker
2047         lc.slow_start = 0
2048         webstatus = StorageStatus(ss)
2049
2050         self.make_shares(ss)
2051         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2052         # set all leases to be expirable
2053         new_expiration_time = now - 3000 + 31*24*60*60
2054
2055         def count_shares(si):
2056             return len(list(ss._iter_share_files(si)))
2057         def _get_sharefile(si):
2058             return list(ss._iter_share_files(si))[0]
2059         def count_leases(si):
2060             return len(list(_get_sharefile(si).get_leases()))
2061
2062         sf0 = _get_sharefile(immutable_si_0)
2063         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2064         sf1 = _get_sharefile(immutable_si_1)
2065         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2066         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2067         sf2 = _get_sharefile(mutable_si_2)
2068         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2069         sf3 = _get_sharefile(mutable_si_3)
2070         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2071         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2072
2073         ss.setServiceParent(self.s)
2074         def _wait():
2075             return bool(lc.get_state()["last-cycle-finished"] is not None)
2076         d = self.poll(_wait)
2077
2078         def _after_first_cycle(ignored):
2079             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2080             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2081             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2082             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2083             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2084             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2085         d.addCallback(_after_first_cycle)
2086         d.addCallback(lambda ign: self.render1(webstatus))
2087         def _check_html(html):
2088             s = remove_tags(html)
2089             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2090         d.addCallback(_check_html)
2091         return d
2092
2093     def test_only_mutable(self):
2094         basedir = "storage/LeaseCrawler/only_mutable"
2095         fileutil.make_dirs(basedir)
2096         now = time.time()
2097         then = int(now - 2000)
2098         ss = StorageServer(basedir, "\x00" * 20,
2099                            expiration_enabled=True,
2100                            expiration_mode="cutoff-date",
2101                            expiration_cutoff_date=then,
2102                            expiration_sharetypes=("mutable",))
2103         lc = ss.lease_checker
2104         lc.slow_start = 0
2105         webstatus = StorageStatus(ss)
2106
2107         self.make_shares(ss)
2108         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2109         # set all leases to be expirable
2110         new_expiration_time = now - 3000 + 31*24*60*60
2111
2112         def count_shares(si):
2113             return len(list(ss._iter_share_files(si)))
2114         def _get_sharefile(si):
2115             return list(ss._iter_share_files(si))[0]
2116         def count_leases(si):
2117             return len(list(_get_sharefile(si).get_leases()))
2118
2119         sf0 = _get_sharefile(immutable_si_0)
2120         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2121         sf1 = _get_sharefile(immutable_si_1)
2122         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2123         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2124         sf2 = _get_sharefile(mutable_si_2)
2125         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2126         sf3 = _get_sharefile(mutable_si_3)
2127         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2128         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2129
2130         ss.setServiceParent(self.s)
2131         def _wait():
2132             return bool(lc.get_state()["last-cycle-finished"] is not None)
2133         d = self.poll(_wait)
2134
2135         def _after_first_cycle(ignored):
2136             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2137             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2138             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2139             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2140             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2141             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2142         d.addCallback(_after_first_cycle)
2143         d.addCallback(lambda ign: self.render1(webstatus))
2144         def _check_html(html):
2145             s = remove_tags(html)
2146             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2147         d.addCallback(_check_html)
2148         return d
2149
2150     def test_bad_mode(self):
2151         basedir = "storage/LeaseCrawler/bad_mode"
2152         fileutil.make_dirs(basedir)
2153         e = self.failUnlessRaises(ValueError,
2154                                   StorageServer, basedir, "\x00" * 20,
2155                                   expiration_mode="bogus")
2156         self.failUnless("GC mode 'bogus' must be 'age' or 'cutoff-date'" in str(e), str(e))
2157
2158     def test_parse_duration(self):
2159         DAY = 24*60*60
2160         MONTH = 31*DAY
2161         YEAR = 365*DAY
2162         p = time_format.parse_duration
2163         self.failUnlessEqual(p("7days"), 7*DAY)
2164         self.failUnlessEqual(p("31day"), 31*DAY)
2165         self.failUnlessEqual(p("60 days"), 60*DAY)
2166         self.failUnlessEqual(p("2mo"), 2*MONTH)
2167         self.failUnlessEqual(p("3 month"), 3*MONTH)
2168         self.failUnlessEqual(p("2years"), 2*YEAR)
2169         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2170         self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2171                         in str(e), str(e))
2172
2173     def test_parse_date(self):
2174         p = time_format.parse_date
2175         self.failUnless(isinstance(p("2009-03-18"), int))
2176         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2177
2178     def test_limited_history(self):
2179         basedir = "storage/LeaseCrawler/limited_history"
2180         fileutil.make_dirs(basedir)
2181         ss = StorageServer(basedir, "\x00" * 20)
2182         # make it start sooner than usual.
2183         lc = ss.lease_checker
2184         lc.slow_start = 0
2185         lc.cpu_slice = 500
2186
2187         # create a few shares, with some leases on them
2188         self.make_shares(ss)
2189
2190         ss.setServiceParent(self.s)
2191
2192         def _wait_until_15_cycles_done():
2193             last = lc.state["last-cycle-finished"]
2194             if last is not None and last >= 15:
2195                 return True
2196             if lc.timer:
2197                 lc.timer.reset(0)
2198             return False
2199         d = self.poll(_wait_until_15_cycles_done)
2200
2201         def _check(ignored):
2202             s = lc.get_state()
2203             h = s["history"]
2204             self.failUnlessEqual(len(h), 10)
2205             self.failUnlessEqual(max(h.keys()), 15)
2206             self.failUnlessEqual(min(h.keys()), 6)
2207         d.addCallback(_check)
2208         return d
2209
2210     def test_unpredictable_future(self):
2211         basedir = "storage/LeaseCrawler/unpredictable_future"
2212         fileutil.make_dirs(basedir)
2213         ss = StorageServer(basedir, "\x00" * 20)
2214         # make it start sooner than usual.
2215         lc = ss.lease_checker
2216         lc.slow_start = 0
2217         lc.cpu_slice = -1.0 # stop quickly
2218
2219         self.make_shares(ss)
2220
2221         ss.setServiceParent(self.s)
2222
2223         d = eventual.fireEventually()
2224         def _check(ignored):
2225             # this should fire after the first bucket is complete, but before
2226             # the first prefix is complete, so the progress-measurer won't
2227             # think we've gotten far enough to raise our percent-complete
2228             # above 0%, triggering the cannot-predict-the-future code in
2229             # expirer.py . This will have to change if/when the
2230             # progress-measurer gets smart enough to count buckets (we'll
2231             # have to interrupt it even earlier, before it's finished the
2232             # first bucket).
2233             s = lc.get_state()
2234             self.failUnless("cycle-to-date" in s)
2235             self.failUnless("estimated-remaining-cycle" in s)
2236             self.failUnless("estimated-current-cycle" in s)
2237
2238             left = s["estimated-remaining-cycle"]["space-recovered"]
2239             self.failUnlessEqual(left["actual-buckets"], None)
2240             self.failUnlessEqual(left["original-buckets"], None)
2241             self.failUnlessEqual(left["configured-buckets"], None)
2242             self.failUnlessEqual(left["actual-shares"], None)
2243             self.failUnlessEqual(left["original-shares"], None)
2244             self.failUnlessEqual(left["configured-shares"], None)
2245             self.failUnlessEqual(left["actual-diskbytes"], None)
2246             self.failUnlessEqual(left["original-diskbytes"], None)
2247             self.failUnlessEqual(left["configured-diskbytes"], None)
2248             self.failUnlessEqual(left["actual-sharebytes"], None)
2249             self.failUnlessEqual(left["original-sharebytes"], None)
2250             self.failUnlessEqual(left["configured-sharebytes"], None)
2251
2252             full = s["estimated-remaining-cycle"]["space-recovered"]
2253             self.failUnlessEqual(full["actual-buckets"], None)
2254             self.failUnlessEqual(full["original-buckets"], None)
2255             self.failUnlessEqual(full["configured-buckets"], None)
2256             self.failUnlessEqual(full["actual-shares"], None)
2257             self.failUnlessEqual(full["original-shares"], None)
2258             self.failUnlessEqual(full["configured-shares"], None)
2259             self.failUnlessEqual(full["actual-diskbytes"], None)
2260             self.failUnlessEqual(full["original-diskbytes"], None)
2261             self.failUnlessEqual(full["configured-diskbytes"], None)
2262             self.failUnlessEqual(full["actual-sharebytes"], None)
2263             self.failUnlessEqual(full["original-sharebytes"], None)
2264             self.failUnlessEqual(full["configured-sharebytes"], None)
2265
2266         d.addCallback(_check)
2267         return d
2268
2269     def test_no_st_blocks(self):
2270         basedir = "storage/LeaseCrawler/no_st_blocks"
2271         fileutil.make_dirs(basedir)
2272         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2273                                         expiration_mode="age",
2274                                         expiration_override_lease_duration=-1000)
2275         # a negative expiration_time= means the "configured-"
2276         # space-recovered counts will be non-zero, since all shares will have
2277         # expired by then
2278
2279         # make it start sooner than usual.
2280         lc = ss.lease_checker
2281         lc.slow_start = 0
2282
2283         self.make_shares(ss)
2284         ss.setServiceParent(self.s)
2285         def _wait():
2286             return bool(lc.get_state()["last-cycle-finished"] is not None)
2287         d = self.poll(_wait)
2288
2289         def _check(ignored):
2290             s = lc.get_state()
2291             last = s["history"][0]
2292             rec = last["space-recovered"]
2293             self.failUnlessEqual(rec["configured-buckets"], 4)
2294             self.failUnlessEqual(rec["configured-shares"], 4)
2295             self.failUnless(rec["configured-sharebytes"] > 0,
2296                             rec["configured-sharebytes"])
2297             # without the .st_blocks field in os.stat() results, we should be
2298             # reporting diskbytes==sharebytes
2299             self.failUnlessEqual(rec["configured-sharebytes"],
2300                                  rec["configured-diskbytes"])
2301         d.addCallback(_check)
2302         return d
2303
2304     def test_share_corruption(self):
2305         basedir = "storage/LeaseCrawler/share_corruption"
2306         fileutil.make_dirs(basedir)
2307         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2308         w = StorageStatus(ss)
2309         # make it start sooner than usual.
2310         lc = ss.lease_checker
2311         lc.stop_after_first_bucket = True
2312         lc.slow_start = 0
2313         lc.cpu_slice = 500
2314
2315         # create a few shares, with some leases on them
2316         self.make_shares(ss)
2317
2318         # now corrupt one, and make sure the lease-checker keeps going
2319         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2320         first = min(self.sis)
2321         first_b32 = base32.b2a(first)
2322         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2323         f = open(fn, "rb+")
2324         f.seek(0)
2325         f.write("BAD MAGIC")
2326         f.close()
2327         # if get_share_file() doesn't see the correct mutable magic, it
2328         # assumes the file is an immutable share, and then
2329         # immutable.ShareFile sees a bad version. So regardless of which kind
2330         # of share we corrupted, this will trigger an
2331         # UnknownImmutableContainerVersionError.
2332
2333         # also create an empty bucket
2334         empty_si = base32.b2a("\x04"*16)
2335         empty_bucket_dir = os.path.join(ss.sharedir,
2336                                         storage_index_to_dir(empty_si))
2337         fileutil.make_dirs(empty_bucket_dir)
2338
2339         ss.setServiceParent(self.s)
2340
2341         d = eventual.fireEventually()
2342
2343         # now examine the state right after the first bucket has been
2344         # processed.
2345         def _after_first_bucket(ignored):
2346             so_far = lc.get_state()["cycle-to-date"]
2347             rec = so_far["space-recovered"]
2348             self.failUnlessEqual(rec["examined-buckets"], 1)
2349             self.failUnlessEqual(rec["examined-shares"], 0)
2350             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2351         d.addCallback(_after_first_bucket)
2352
2353         d.addCallback(lambda ign: self.render_json(w))
2354         def _check_json(json):
2355             data = simplejson.loads(json)
2356             # grr. json turns all dict keys into strings.
2357             so_far = data["lease-checker"]["cycle-to-date"]
2358             corrupt_shares = so_far["corrupt-shares"]
2359             # it also turns all tuples into lists
2360             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2361         d.addCallback(_check_json)
2362         d.addCallback(lambda ign: self.render1(w))
2363         def _check_html(html):
2364             s = remove_tags(html)
2365             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2366         d.addCallback(_check_html)
2367
2368         def _wait():
2369             return bool(lc.get_state()["last-cycle-finished"] is not None)
2370         d.addCallback(lambda ign: self.poll(_wait))
2371
2372         def _after_first_cycle(ignored):
2373             s = lc.get_state()
2374             last = s["history"][0]
2375             rec = last["space-recovered"]
2376             self.failUnlessEqual(rec["examined-buckets"], 5)
2377             self.failUnlessEqual(rec["examined-shares"], 3)
2378             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2379         d.addCallback(_after_first_cycle)
2380         d.addCallback(lambda ign: self.render_json(w))
2381         def _check_json_history(json):
2382             data = simplejson.loads(json)
2383             last = data["lease-checker"]["history"]["0"]
2384             corrupt_shares = last["corrupt-shares"]
2385             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2386         d.addCallback(_check_json_history)
2387         d.addCallback(lambda ign: self.render1(w))
2388         def _check_html_history(html):
2389             s = remove_tags(html)
2390             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2391         d.addCallback(_check_html_history)
2392
2393         def _cleanup(res):
2394             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2395                                    UnknownImmutableContainerVersionError)
2396             return res
2397         d.addBoth(_cleanup)
2398         return d
2399
2400     def render_json(self, page):
2401         d = self.render1(page, args={"t": ["json"]})
2402         return d
2403
2404 class NoStatvfsServer(StorageServer):
2405     def do_statvfs(self):
2406         raise AttributeError
2407
2408 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2409
2410     def setUp(self):
2411         self.s = service.MultiService()
2412         self.s.startService()
2413     def tearDown(self):
2414         return self.s.stopService()
2415
2416     def test_no_server(self):
2417         w = StorageStatus(None)
2418         html = w.renderSynchronously()
2419         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2420
2421     def test_status(self):
2422         basedir = "storage/WebStatus/status"
2423         fileutil.make_dirs(basedir)
2424         ss = StorageServer(basedir, "\x00" * 20)
2425         ss.setServiceParent(self.s)
2426         w = StorageStatus(ss)
2427         d = self.render1(w)
2428         def _check_html(html):
2429             self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2430             s = remove_tags(html)
2431             self.failUnless("Accepting new shares: Yes" in s, s)
2432             self.failUnless("Reserved space: - 0 B (0)" in s, s)
2433         d.addCallback(_check_html)
2434         d.addCallback(lambda ign: self.render_json(w))
2435         def _check_json(json):
2436             data = simplejson.loads(json)
2437             s = data["stats"]
2438             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2439             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2440             self.failUnless("bucket-counter" in data)
2441             self.failUnless("lease-checker" in data)
2442         d.addCallback(_check_json)
2443         return d
2444
2445     def render_json(self, page):
2446         d = self.render1(page, args={"t": ["json"]})
2447         return d
2448
2449     def test_status_no_statvfs(self):
2450         # windows has no os.statvfs . Make sure the code handles that even on
2451         # unix.
2452         basedir = "storage/WebStatus/status_no_statvfs"
2453         fileutil.make_dirs(basedir)
2454         ss = NoStatvfsServer(basedir, "\x00" * 20)
2455         ss.setServiceParent(self.s)
2456         w = StorageStatus(ss)
2457         html = w.renderSynchronously()
2458         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2459         s = remove_tags(html)
2460         self.failUnless("Accepting new shares: Yes" in s, s)
2461         self.failUnless("Total disk space: ?" in s, s)
2462
2463     def test_readonly(self):
2464         basedir = "storage/WebStatus/readonly"
2465         fileutil.make_dirs(basedir)
2466         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2467         ss.setServiceParent(self.s)
2468         w = StorageStatus(ss)
2469         html = w.renderSynchronously()
2470         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2471         s = remove_tags(html)
2472         self.failUnless("Accepting new shares: No" in s, s)
2473
2474     def test_reserved(self):
2475         basedir = "storage/WebStatus/reserved"
2476         fileutil.make_dirs(basedir)
2477         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2478         ss.setServiceParent(self.s)
2479         w = StorageStatus(ss)
2480         html = w.renderSynchronously()
2481         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2482         s = remove_tags(html)
2483         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2484
2485     def test_huge_reserved(self):
2486         basedir = "storage/WebStatus/reserved"
2487         fileutil.make_dirs(basedir)
2488         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2489         ss.setServiceParent(self.s)
2490         w = StorageStatus(ss)
2491         html = w.renderSynchronously()
2492         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2493         s = remove_tags(html)
2494         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2495
2496     def test_util(self):
2497         w = StorageStatus(None)
2498         self.failUnlessEqual(w.render_space(None, None), "?")
2499         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2500         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2501         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2502         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2503         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
2504