]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
test_storage.py: print more information on test failures.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re, simplejson, struct
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
26
27 class Marker:
28     pass
29 class FakeCanary:
30     def __init__(self, ignore_disconnectors=False):
31         self.ignore = ignore_disconnectors
32         self.disconnectors = {}
33     def notifyOnDisconnect(self, f, *args, **kwargs):
34         if self.ignore:
35             return
36         m = Marker()
37         self.disconnectors[m] = (f, args, kwargs)
38         return m
39     def dontNotifyOnDisconnect(self, marker):
40         if self.ignore:
41             return
42         del self.disconnectors[marker]
43
44 class FakeStatsProvider:
45     def count(self, name, delta=1):
46         pass
47     def register_producer(self, producer):
48         pass
49
50 class Bucket(unittest.TestCase):
51     def make_workdir(self, name):
52         basedir = os.path.join("storage", "Bucket", name)
53         incoming = os.path.join(basedir, "tmp", "bucket")
54         final = os.path.join(basedir, "bucket")
55         fileutil.make_dirs(basedir)
56         fileutil.make_dirs(os.path.join(basedir, "tmp"))
57         return incoming, final
58
59     def bucket_writer_closed(self, bw, consumed):
60         pass
61     def add_latency(self, category, latency):
62         pass
63     def count(self, name, delta=1):
64         pass
65
66     def make_lease(self):
67         owner_num = 0
68         renew_secret = os.urandom(32)
69         cancel_secret = os.urandom(32)
70         expiration_time = time.time() + 5000
71         return LeaseInfo(owner_num, renew_secret, cancel_secret,
72                          expiration_time, "\x00" * 20)
73
74     def test_create(self):
75         incoming, final = self.make_workdir("test_create")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*25)
81         bw.remote_write(75, "d"*7)
82         bw.remote_close()
83
84     def test_readwrite(self):
85         incoming, final = self.make_workdir("test_readwrite")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*7) # last block may be short
91         bw.remote_close()
92
93         # now read from it
94         br = BucketReader(self, bw.finalhome)
95         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
98
99 class RemoteBucket:
100
101     def callRemote(self, methname, *args, **kwargs):
102         def _call():
103             meth = getattr(self.target, "remote_" + methname)
104             return meth(*args, **kwargs)
105         return defer.maybeDeferred(_call)
106
107 class BucketProxy(unittest.TestCase):
108     def make_bucket(self, name, size):
109         basedir = os.path.join("storage", "BucketProxy", name)
110         incoming = os.path.join(basedir, "tmp", "bucket")
111         final = os.path.join(basedir, "bucket")
112         fileutil.make_dirs(basedir)
113         fileutil.make_dirs(os.path.join(basedir, "tmp"))
114         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
115                           FakeCanary())
116         rb = RemoteBucket()
117         rb.target = bw
118         return bw, rb, final
119
120     def make_lease(self):
121         owner_num = 0
122         renew_secret = os.urandom(32)
123         cancel_secret = os.urandom(32)
124         expiration_time = time.time() + 5000
125         return LeaseInfo(owner_num, renew_secret, cancel_secret,
126                          expiration_time, "\x00" * 20)
127
128     def bucket_writer_closed(self, bw, consumed):
129         pass
130     def add_latency(self, category, latency):
131         pass
132     def count(self, name, delta=1):
133         pass
134
135     def test_create(self):
136         bw, rb, sharefname = self.make_bucket("test_create", 500)
137         bp = WriteBucketProxy(rb,
138                               data_size=300,
139                               block_size=10,
140                               num_segments=5,
141                               num_share_hashes=3,
142                               uri_extension_size_max=500, nodeid=None)
143         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
144
145     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146         # Let's pretend each share has 100 bytes of data, and that there are
147         # 4 segments (25 bytes each), and 8 shares total. So the two
148         # per-segment merkle trees (crypttext_hash_tree,
149         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152         # long. That should make the whole share:
153         #
154         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
156
157         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
158
159         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
160                             for i in range(7)]
161         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
162                         for i in range(7)]
163         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
164                         for i in (1,9,13)]
165         uri_extension = "s" + "E"*498 + "e"
166
167         bw, rb, sharefname = self.make_bucket(name, sharesize)
168         bp = wbp_class(rb,
169                        data_size=95,
170                        block_size=25,
171                        num_segments=4,
172                        num_share_hashes=3,
173                        uri_extension_size_max=len(uri_extension),
174                        nodeid=None)
175
176         d = bp.put_header()
177         d.addCallback(lambda res: bp.put_block(0, "a"*25))
178         d.addCallback(lambda res: bp.put_block(1, "b"*25))
179         d.addCallback(lambda res: bp.put_block(2, "c"*25))
180         d.addCallback(lambda res: bp.put_block(3, "d"*20))
181         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185         d.addCallback(lambda res: bp.close())
186
187         # now read everything back
188         def _start_reading(res):
189             br = BucketReader(self, sharefname)
190             rb = RemoteBucket()
191             rb.target = br
192             rbp = rbp_class(rb, peerid="abc", storage_index="")
193             self.failUnlessIn("to peer", repr(rbp))
194             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
195
196             d1 = rbp.get_block_data(0, 25, 25)
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
204
205             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, crypttext_hashes))
208             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210             d1.addCallback(lambda res: rbp.get_share_hashes())
211             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212             d1.addCallback(lambda res: rbp.get_uri_extension())
213             d1.addCallback(lambda res:
214                            self.failUnlessEqual(res, uri_extension))
215
216             return d1
217
218         d.addCallback(_start_reading)
219
220         return d
221
222     def test_readwrite_v1(self):
223         return self._do_test_readwrite("test_readwrite_v1",
224                                        0x24, WriteBucketProxy, ReadBucketProxy)
225
226     def test_readwrite_v2(self):
227         return self._do_test_readwrite("test_readwrite_v2",
228                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
229
230 class FakeDiskStorageServer(StorageServer):
231     DISKAVAIL = 0
232     def get_disk_stats(self):
233         return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
234
235 class Server(unittest.TestCase):
236
237     def setUp(self):
238         self.sparent = LoggingServiceParent()
239         self.sparent.startService()
240         self._lease_secret = itertools.count()
241     def tearDown(self):
242         return self.sparent.stopService()
243
244     def workdir(self, name):
245         basedir = os.path.join("storage", "Server", name)
246         return basedir
247
248     def create(self, name, reserved_space=0, klass=StorageServer):
249         workdir = self.workdir(name)
250         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251                    stats_provider=FakeStatsProvider())
252         ss.setServiceParent(self.sparent)
253         return ss
254
255     def test_create(self):
256         self.create("test_create")
257
258     def allocate(self, ss, storage_index, sharenums, size, canary=None):
259         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
261         if not canary:
262             canary = FakeCanary()
263         return ss.remote_allocate_buckets(storage_index,
264                                           renew_secret, cancel_secret,
265                                           sharenums, size, canary)
266
267     def test_large_share(self):
268         ss = self.create("test_large_share")
269
270         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271         self.failUnlessEqual(already, set())
272         self.failUnlessEqual(set(writers.keys()), set([0]))
273
274         shnum, bucket = writers.items()[0]
275         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
276         bucket.remote_write(2**32, "ab")
277         bucket.remote_close()
278
279         readers = ss.remote_get_buckets("allocate")
280         reader = readers[shnum]
281         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
283
284     def test_dont_overfill_dirs(self):
285         """
286         This test asserts that if you add a second share whose storage index
287         share lots of leading bits with an extant share (but isn't the exact
288         same storage index), this won't add an entry to the share directory.
289         """
290         ss = self.create("test_dont_overfill_dirs")
291         already, writers = self.allocate(ss, "storageindex", [0], 10)
292         for i, wb in writers.items():
293             wb.remote_write(0, "%10d" % i)
294             wb.remote_close()
295         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296                                 "shares")
297         children_of_storedir = set(os.listdir(storedir))
298
299         # Now store another one under another storageindex that has leading
300         # chars the same as the first storageindex.
301         already, writers = self.allocate(ss, "storageindey", [0], 10)
302         for i, wb in writers.items():
303             wb.remote_write(0, "%10d" % i)
304             wb.remote_close()
305         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
306                                 "shares")
307         new_children_of_storedir = set(os.listdir(storedir))
308         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
309
310     def test_remove_incoming(self):
311         ss = self.create("test_remove_incoming")
312         already, writers = self.allocate(ss, "vid", range(3), 10)
313         for i,wb in writers.items():
314             wb.remote_write(0, "%10d" % i)
315             wb.remote_close()
316         incoming_share_dir = wb.incominghome
317         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319         incoming_dir = os.path.dirname(incoming_prefix_dir)
320         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
321         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
322         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
323
324     def test_allocate(self):
325         ss = self.create("test_allocate")
326
327         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
328
329         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330         self.failUnlessEqual(already, set())
331         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
332
333         # while the buckets are open, they should not count as readable
334         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
335
336         # close the buckets
337         for i,wb in writers.items():
338             wb.remote_write(0, "%25d" % i)
339             wb.remote_close()
340             # aborting a bucket that was already closed is a no-op
341             wb.remote_abort()
342
343         # now they should be readable
344         b = ss.remote_get_buckets("allocate")
345         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
347         b_str = str(b[0])
348         self.failUnlessIn("BucketReader", b_str)
349         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
350
351         # now if we ask about writing again, the server should offer those
352         # three buckets as already present. It should offer them even if we
353         # don't ask about those specific ones.
354         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355         self.failUnlessEqual(already, set([0,1,2]))
356         self.failUnlessEqual(set(writers.keys()), set([3,4]))
357
358         # while those two buckets are open for writing, the server should
359         # refuse to offer them to uploaders
360
361         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362         self.failUnlessEqual(already2, set([0,1,2]))
363         self.failUnlessEqual(set(writers2.keys()), set([5]))
364
365         # aborting the writes should remove the tempfiles
366         for i,wb in writers2.items():
367             wb.remote_abort()
368         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369         self.failUnlessEqual(already2, set([0,1,2]))
370         self.failUnlessEqual(set(writers2.keys()), set([5]))
371
372         for i,wb in writers2.items():
373             wb.remote_abort()
374         for i,wb in writers.items():
375             wb.remote_abort()
376
377     def test_bad_container_version(self):
378         ss = self.create("test_bad_container_version")
379         a,w = self.allocate(ss, "si1", [0], 10)
380         w[0].remote_write(0, "\xff"*10)
381         w[0].remote_close()
382
383         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
384         f = open(fn, "rb+")
385         f.seek(0)
386         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
387         f.close()
388
389         ss.remote_get_buckets("allocate")
390
391         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392                                   ss.remote_get_buckets, "si1")
393         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
394
395     def test_disconnect(self):
396         # simulate a disconnection
397         ss = self.create("test_disconnect")
398         canary = FakeCanary()
399         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400         self.failUnlessEqual(already, set())
401         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402         for (f,args,kwargs) in canary.disconnectors.values():
403             f(*args, **kwargs)
404         del already
405         del writers
406
407         # that ought to delete the incoming shares
408         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409         self.failUnlessEqual(already, set())
410         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
411
412     def test_reserved_space(self):
413         ss = self.create("test_reserved_space", reserved_space=10000,
414                          klass=FakeDiskStorageServer)
415         # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
416         ss.DISKAVAIL = 15000
417         # 15k available, 10k reserved, leaves 5k for shares
418
419         # a newly created and filled share incurs this much overhead, beyond
420         # the size we request.
421         OVERHEAD = 3*4
422         LEASE_SIZE = 4+32+32+4
423         canary = FakeCanary(True)
424         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425         self.failUnlessEqual(len(writers), 3)
426         # now the StorageServer should have 3000 bytes provisionally
427         # allocated, allowing only 2000 more to be claimed
428         self.failUnlessEqual(len(ss._active_writers), 3)
429
430         # allocating 1001-byte shares only leaves room for one
431         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432         self.failUnlessEqual(len(writers2), 1)
433         self.failUnlessEqual(len(ss._active_writers), 4)
434
435         # we abandon the first set, so their provisional allocation should be
436         # returned
437         del already
438         del writers
439         self.failUnlessEqual(len(ss._active_writers), 1)
440         # now we have a provisional allocation of 1001 bytes
441
442         # and we close the second set, so their provisional allocation should
443         # become real, long-term allocation, and grows to include the
444         # overhead.
445         for bw in writers2.values():
446             bw.remote_write(0, "a"*25)
447             bw.remote_close()
448         del already2
449         del writers2
450         del bw
451         self.failUnlessEqual(len(ss._active_writers), 0)
452
453         allocated = 1001 + OVERHEAD + LEASE_SIZE
454
455         # we have to manually increase DISKAVAIL, since we're not doing real
456         # disk measurements
457         ss.DISKAVAIL -= allocated
458
459         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462         self.failUnlessEqual(len(writers3), 39)
463         self.failUnlessEqual(len(ss._active_writers), 39)
464
465         del already3
466         del writers3
467         self.failUnlessEqual(len(ss._active_writers), 0)
468         ss.disownServiceParent()
469         del ss
470
471     def test_disk_stats(self):
472         # This will spuriously fail if there is zero disk space left (but so will other tests).
473         ss = self.create("test_disk_stats", reserved_space=0)
474
475         disk = ss.get_disk_stats()
476         self.failUnless(disk['total'] > 0, disk['total'])
477         self.failUnless(disk['used'] > 0, disk['used'])
478         self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
479         self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
480         self.failUnless(disk['avail'] > 0, disk['avail'])
481
482     def test_disk_stats_avail_nonnegative(self):
483         ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
484
485         disk = ss.get_disk_stats()
486         self.failUnlessEqual(disk['avail'], 0)
487
488     def test_seek(self):
489         basedir = self.workdir("test_seek_behavior")
490         fileutil.make_dirs(basedir)
491         filename = os.path.join(basedir, "testfile")
492         f = open(filename, "wb")
493         f.write("start")
494         f.close()
495         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
496         # files. mode="a" preserves previous contents but does not allow
497         # seeking-to-create-holes. mode="r+" allows both.
498         f = open(filename, "rb+")
499         f.seek(100)
500         f.write("100")
501         f.close()
502         filelen = os.stat(filename)[stat.ST_SIZE]
503         self.failUnlessEqual(filelen, 100+3)
504         f2 = open(filename, "rb")
505         self.failUnlessEqual(f2.read(5), "start")
506
507
508     def test_leases(self):
509         ss = self.create("test_leases")
510         canary = FakeCanary()
511         sharenums = range(5)
512         size = 100
513
514         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
515                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
516         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
517                                                      sharenums, size, canary)
518         self.failUnlessEqual(len(already), 0)
519         self.failUnlessEqual(len(writers), 5)
520         for wb in writers.values():
521             wb.remote_close()
522
523         leases = list(ss.get_leases("si0"))
524         self.failUnlessEqual(len(leases), 1)
525         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
526
527         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
528                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
529         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
530                                                      sharenums, size, canary)
531         for wb in writers.values():
532             wb.remote_close()
533
534         # take out a second lease on si1
535         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
538                                                      sharenums, size, canary)
539         self.failUnlessEqual(len(already), 5)
540         self.failUnlessEqual(len(writers), 0)
541
542         leases = list(ss.get_leases("si1"))
543         self.failUnlessEqual(len(leases), 2)
544         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
545
546         # and a third lease, using add-lease
547         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
548                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
549         ss.remote_add_lease("si1", rs2a, cs2a)
550         leases = list(ss.get_leases("si1"))
551         self.failUnlessEqual(len(leases), 3)
552         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
553
554         # add-lease on a missing storage index is silently ignored
555         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
556
557         # check that si0 is readable
558         readers = ss.remote_get_buckets("si0")
559         self.failUnlessEqual(len(readers), 5)
560
561         # renew the first lease. Only the proper renew_secret should work
562         ss.remote_renew_lease("si0", rs0)
563         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
564         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
565
566         # check that si0 is still readable
567         readers = ss.remote_get_buckets("si0")
568         self.failUnlessEqual(len(readers), 5)
569
570         # now cancel it
571         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
572         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
573         ss.remote_cancel_lease("si0", cs0)
574
575         # si0 should now be gone
576         readers = ss.remote_get_buckets("si0")
577         self.failUnlessEqual(len(readers), 0)
578         # and the renew should no longer work
579         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
580
581
582         # cancel the first lease on si1, leaving the second and third in place
583         ss.remote_cancel_lease("si1", cs1)
584         readers = ss.remote_get_buckets("si1")
585         self.failUnlessEqual(len(readers), 5)
586         # the corresponding renew should no longer work
587         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
588
589         leases = list(ss.get_leases("si1"))
590         self.failUnlessEqual(len(leases), 2)
591         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
592
593         ss.remote_renew_lease("si1", rs2)
594         # cancelling the second and third should make it go away
595         ss.remote_cancel_lease("si1", cs2)
596         ss.remote_cancel_lease("si1", cs2a)
597         readers = ss.remote_get_buckets("si1")
598         self.failUnlessEqual(len(readers), 0)
599         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
600         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
601         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
602
603         leases = list(ss.get_leases("si1"))
604         self.failUnlessEqual(len(leases), 0)
605
606
607         # test overlapping uploads
608         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
611                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
612         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
613                                                      sharenums, size, canary)
614         self.failUnlessEqual(len(already), 0)
615         self.failUnlessEqual(len(writers), 5)
616         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
617                                                        sharenums, size, canary)
618         self.failUnlessEqual(len(already2), 0)
619         self.failUnlessEqual(len(writers2), 0)
620         for wb in writers.values():
621             wb.remote_close()
622
623         leases = list(ss.get_leases("si3"))
624         self.failUnlessEqual(len(leases), 1)
625
626         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
627                                                        sharenums, size, canary)
628         self.failUnlessEqual(len(already3), 5)
629         self.failUnlessEqual(len(writers3), 0)
630
631         leases = list(ss.get_leases("si3"))
632         self.failUnlessEqual(len(leases), 2)
633
634     def test_readonly(self):
635         workdir = self.workdir("test_readonly")
636         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
637         ss.setServiceParent(self.sparent)
638
639         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
640         self.failUnlessEqual(already, set())
641         self.failUnlessEqual(writers, {})
642
643         stats = ss.get_stats()
644         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
645         if "storage_server.disk_avail" in stats:
646             # Some platforms may not have an API to get disk stats.
647             # But if there are stats, readonly_storage means disk_avail=0
648             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
649
650     def test_discard(self):
651         # discard is really only used for other tests, but we test it anyways
652         workdir = self.workdir("test_discard")
653         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
654         ss.setServiceParent(self.sparent)
655
656         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
657         self.failUnlessEqual(already, set())
658         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
659         for i,wb in writers.items():
660             wb.remote_write(0, "%25d" % i)
661             wb.remote_close()
662         # since we discard the data, the shares should be present but sparse.
663         # Since we write with some seeks, the data we read back will be all
664         # zeros.
665         b = ss.remote_get_buckets("vid")
666         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
667         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
668
669     def test_advise_corruption(self):
670         workdir = self.workdir("test_advise_corruption")
671         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
672         ss.setServiceParent(self.sparent)
673
674         si0_s = base32.b2a("si0")
675         ss.remote_advise_corrupt_share("immutable", "si0", 0,
676                                        "This share smells funny.\n")
677         reportdir = os.path.join(workdir, "corruption-advisories")
678         reports = os.listdir(reportdir)
679         self.failUnlessEqual(len(reports), 1)
680         report_si0 = reports[0]
681         self.failUnlessIn(si0_s, report_si0)
682         f = open(os.path.join(reportdir, report_si0), "r")
683         report = f.read()
684         f.close()
685         self.failUnlessIn("type: immutable", report)
686         self.failUnlessIn("storage_index: %s" % si0_s, report)
687         self.failUnlessIn("share_number: 0", report)
688         self.failUnlessIn("This share smells funny.", report)
689
690         # test the RIBucketWriter version too
691         si1_s = base32.b2a("si1")
692         already,writers = self.allocate(ss, "si1", [1], 75)
693         self.failUnlessEqual(already, set())
694         self.failUnlessEqual(set(writers.keys()), set([1]))
695         writers[1].remote_write(0, "data")
696         writers[1].remote_close()
697
698         b = ss.remote_get_buckets("si1")
699         self.failUnlessEqual(set(b.keys()), set([1]))
700         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
701
702         reports = os.listdir(reportdir)
703         self.failUnlessEqual(len(reports), 2)
704         report_si1 = [r for r in reports if si1_s in r][0]
705         f = open(os.path.join(reportdir, report_si1), "r")
706         report = f.read()
707         f.close()
708         self.failUnlessIn("type: immutable", report)
709         self.failUnlessIn("storage_index: %s" % si1_s, report)
710         self.failUnlessIn("share_number: 1", report)
711         self.failUnlessIn("This share tastes like dust.", report)
712
713
714
715 class MutableServer(unittest.TestCase):
716
717     def setUp(self):
718         self.sparent = LoggingServiceParent()
719         self._lease_secret = itertools.count()
720     def tearDown(self):
721         return self.sparent.stopService()
722
723     def workdir(self, name):
724         basedir = os.path.join("storage", "MutableServer", name)
725         return basedir
726
727     def create(self, name):
728         workdir = self.workdir(name)
729         ss = StorageServer(workdir, "\x00" * 20)
730         ss.setServiceParent(self.sparent)
731         return ss
732
733     def test_create(self):
734         self.create("test_create")
735
736     def write_enabler(self, we_tag):
737         return hashutil.tagged_hash("we_blah", we_tag)
738
739     def renew_secret(self, tag):
740         return hashutil.tagged_hash("renew_blah", str(tag))
741
742     def cancel_secret(self, tag):
743         return hashutil.tagged_hash("cancel_blah", str(tag))
744
745     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
746         write_enabler = self.write_enabler(we_tag)
747         renew_secret = self.renew_secret(lease_tag)
748         cancel_secret = self.cancel_secret(lease_tag)
749         rstaraw = ss.remote_slot_testv_and_readv_and_writev
750         testandwritev = dict( [ (shnum, ([], [], None) )
751                          for shnum in sharenums ] )
752         readv = []
753         rc = rstaraw(storage_index,
754                      (write_enabler, renew_secret, cancel_secret),
755                      testandwritev,
756                      readv)
757         (did_write, readv_data) = rc
758         self.failUnless(did_write)
759         self.failUnless(isinstance(readv_data, dict))
760         self.failUnlessEqual(len(readv_data), 0)
761
762     def test_bad_magic(self):
763         ss = self.create("test_bad_magic")
764         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
765         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
766         f = open(fn, "rb+")
767         f.seek(0)
768         f.write("BAD MAGIC")
769         f.close()
770         read = ss.remote_slot_readv
771         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
772                                   read, "si1", [0], [(0,10)])
773         self.failUnlessIn(" had magic ", str(e))
774         self.failUnlessIn(" but we wanted ", str(e))
775
776     def test_container_size(self):
777         ss = self.create("test_container_size")
778         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
779                       set([0,1,2]), 100)
780         read = ss.remote_slot_readv
781         rstaraw = ss.remote_slot_testv_and_readv_and_writev
782         secrets = ( self.write_enabler("we1"),
783                     self.renew_secret("we1"),
784                     self.cancel_secret("we1") )
785         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
786         answer = rstaraw("si1", secrets,
787                          {0: ([], [(0,data)], len(data)+12)},
788                          [])
789         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
790
791         # trying to make the container too large will raise an exception
792         TOOBIG = MutableShareFile.MAX_SIZE + 10
793         self.failUnlessRaises(DataTooLargeError,
794                               rstaraw, "si1", secrets,
795                               {0: ([], [(0,data)], TOOBIG)},
796                               [])
797
798         # it should be possible to make the container smaller, although at
799         # the moment this doesn't actually affect the share, unless the
800         # container size is dropped to zero, in which case the share is
801         # deleted.
802         answer = rstaraw("si1", secrets,
803                          {0: ([], [(0,data)], len(data)+8)},
804                          [])
805         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
806
807         answer = rstaraw("si1", secrets,
808                          {0: ([], [(0,data)], 0)},
809                          [])
810         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
811
812         read_answer = read("si1", [0], [(0,10)])
813         self.failUnlessEqual(read_answer, {})
814
815     def test_allocate(self):
816         ss = self.create("test_allocate")
817         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
818                       set([0,1,2]), 100)
819
820         read = ss.remote_slot_readv
821         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
822                              {0: [""]})
823         self.failUnlessEqual(read("si1", [], [(0, 10)]),
824                              {0: [""], 1: [""], 2: [""]})
825         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
826                              {0: [""]})
827
828         # try writing to one
829         secrets = ( self.write_enabler("we1"),
830                     self.renew_secret("we1"),
831                     self.cancel_secret("we1") )
832         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
833         write = ss.remote_slot_testv_and_readv_and_writev
834         answer = write("si1", secrets,
835                        {0: ([], [(0,data)], None)},
836                        [])
837         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
838
839         self.failUnlessEqual(read("si1", [0], [(0,20)]),
840                              {0: ["00000000001111111111"]})
841         self.failUnlessEqual(read("si1", [0], [(95,10)]),
842                              {0: ["99999"]})
843         #self.failUnlessEqual(s0.remote_get_length(), 100)
844
845         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
846         f = self.failUnlessRaises(BadWriteEnablerError,
847                                   write, "si1", bad_secrets,
848                                   {}, [])
849         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
850
851         # this testv should fail
852         answer = write("si1", secrets,
853                        {0: ([(0, 12, "eq", "444444444444"),
854                              (20, 5, "eq", "22222"),
855                              ],
856                             [(0, "x"*100)],
857                             None),
858                         },
859                        [(0,12), (20,5)],
860                        )
861         self.failUnlessEqual(answer, (False,
862                                       {0: ["000000000011", "22222"],
863                                        1: ["", ""],
864                                        2: ["", ""],
865                                        }))
866         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
867
868         # as should this one
869         answer = write("si1", secrets,
870                        {0: ([(10, 5, "lt", "11111"),
871                              ],
872                             [(0, "x"*100)],
873                             None),
874                         },
875                        [(10,5)],
876                        )
877         self.failUnlessEqual(answer, (False,
878                                       {0: ["11111"],
879                                        1: [""],
880                                        2: [""]},
881                                       ))
882         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
883
884
885     def test_operators(self):
886         # test operators, the data we're comparing is '11111' in all cases.
887         # test both fail+pass, reset data after each one.
888         ss = self.create("test_operators")
889
890         secrets = ( self.write_enabler("we1"),
891                     self.renew_secret("we1"),
892                     self.cancel_secret("we1") )
893         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
894         write = ss.remote_slot_testv_and_readv_and_writev
895         read = ss.remote_slot_readv
896
897         def reset():
898             write("si1", secrets,
899                   {0: ([], [(0,data)], None)},
900                   [])
901
902         reset()
903
904         #  lt
905         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
906                                              ],
907                                             [(0, "x"*100)],
908                                             None,
909                                             )}, [(10,5)])
910         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
911         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
912         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
913         reset()
914
915         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
916                                              ],
917                                             [(0, "x"*100)],
918                                             None,
919                                             )}, [(10,5)])
920         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
921         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
922         reset()
923
924         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
925                                              ],
926                                             [(0, "y"*100)],
927                                             None,
928                                             )}, [(10,5)])
929         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
930         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
931         reset()
932
933         #  le
934         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
935                                              ],
936                                             [(0, "x"*100)],
937                                             None,
938                                             )}, [(10,5)])
939         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
940         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
941         reset()
942
943         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
944                                              ],
945                                             [(0, "y"*100)],
946                                             None,
947                                             )}, [(10,5)])
948         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
949         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
950         reset()
951
952         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
953                                              ],
954                                             [(0, "y"*100)],
955                                             None,
956                                             )}, [(10,5)])
957         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
959         reset()
960
961         #  eq
962         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
963                                              ],
964                                             [(0, "x"*100)],
965                                             None,
966                                             )}, [(10,5)])
967         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
968         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
969         reset()
970
971         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
972                                              ],
973                                             [(0, "y"*100)],
974                                             None,
975                                             )}, [(10,5)])
976         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
977         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
978         reset()
979
980         #  ne
981         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
982                                              ],
983                                             [(0, "x"*100)],
984                                             None,
985                                             )}, [(10,5)])
986         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
987         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
988         reset()
989
990         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
991                                              ],
992                                             [(0, "y"*100)],
993                                             None,
994                                             )}, [(10,5)])
995         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
996         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
997         reset()
998
999         #  ge
1000         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1001                                              ],
1002                                             [(0, "y"*100)],
1003                                             None,
1004                                             )}, [(10,5)])
1005         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1006         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1007         reset()
1008
1009         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1010                                              ],
1011                                             [(0, "y"*100)],
1012                                             None,
1013                                             )}, [(10,5)])
1014         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1016         reset()
1017
1018         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1019                                              ],
1020                                             [(0, "y"*100)],
1021                                             None,
1022                                             )}, [(10,5)])
1023         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1024         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1025         reset()
1026
1027         #  gt
1028         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1029                                              ],
1030                                             [(0, "y"*100)],
1031                                             None,
1032                                             )}, [(10,5)])
1033         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1034         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1035         reset()
1036
1037         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1038                                              ],
1039                                             [(0, "x"*100)],
1040                                             None,
1041                                             )}, [(10,5)])
1042         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1043         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1044         reset()
1045
1046         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1047                                              ],
1048                                             [(0, "x"*100)],
1049                                             None,
1050                                             )}, [(10,5)])
1051         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1053         reset()
1054
1055         # finally, test some operators against empty shares
1056         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1057                                              ],
1058                                             [(0, "x"*100)],
1059                                             None,
1060                                             )}, [(10,5)])
1061         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1062         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1063         reset()
1064
1065     def test_readv(self):
1066         ss = self.create("test_readv")
1067         secrets = ( self.write_enabler("we1"),
1068                     self.renew_secret("we1"),
1069                     self.cancel_secret("we1") )
1070         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1071         write = ss.remote_slot_testv_and_readv_and_writev
1072         read = ss.remote_slot_readv
1073         data = [("%d" % i) * 100 for i in range(3)]
1074         rc = write("si1", secrets,
1075                    {0: ([], [(0,data[0])], None),
1076                     1: ([], [(0,data[1])], None),
1077                     2: ([], [(0,data[2])], None),
1078                     }, [])
1079         self.failUnlessEqual(rc, (True, {}))
1080
1081         answer = read("si1", [], [(0, 10)])
1082         self.failUnlessEqual(answer, {0: ["0"*10],
1083                                       1: ["1"*10],
1084                                       2: ["2"*10]})
1085
1086     def compare_leases_without_timestamps(self, leases_a, leases_b):
1087         self.failUnlessEqual(len(leases_a), len(leases_b))
1088         for i in range(len(leases_a)):
1089             a = leases_a[i]
1090             b = leases_b[i]
1091             self.failUnlessEqual(a.owner_num,       b.owner_num)
1092             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1093             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1094             self.failUnlessEqual(a.nodeid,          b.nodeid)
1095
1096     def compare_leases(self, leases_a, leases_b):
1097         self.failUnlessEqual(len(leases_a), len(leases_b))
1098         for i in range(len(leases_a)):
1099             a = leases_a[i]
1100             b = leases_b[i]
1101             self.failUnlessEqual(a.owner_num,       b.owner_num)
1102             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1103             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1104             self.failUnlessEqual(a.nodeid,          b.nodeid)
1105             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1106
1107     def test_leases(self):
1108         ss = self.create("test_leases")
1109         def secrets(n):
1110             return ( self.write_enabler("we1"),
1111                      self.renew_secret("we1-%d" % n),
1112                      self.cancel_secret("we1-%d" % n) )
1113         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1114         write = ss.remote_slot_testv_and_readv_and_writev
1115         read = ss.remote_slot_readv
1116         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1117         self.failUnlessEqual(rc, (True, {}))
1118
1119         # create a random non-numeric file in the bucket directory, to
1120         # exercise the code that's supposed to ignore those.
1121         bucket_dir = os.path.join(self.workdir("test_leases"),
1122                                   "shares", storage_index_to_dir("si1"))
1123         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1124         f.write("you ought to be ignoring me\n")
1125         f.close()
1126
1127         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1128         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1129
1130         # add-lease on a missing storage index is silently ignored
1131         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1132
1133         # re-allocate the slots and use the same secrets, that should update
1134         # the lease
1135         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1136         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1137
1138         # renew it directly
1139         ss.remote_renew_lease("si1", secrets(0)[1])
1140         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1141
1142         # now allocate them with a bunch of different secrets, to trigger the
1143         # extended lease code. Use add_lease for one of them.
1144         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1145         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1146         secrets2 = secrets(2)
1147         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1148         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1149         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1150         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1151         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1152
1153         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1154
1155         # cancel one of them
1156         ss.remote_cancel_lease("si1", secrets(5)[2])
1157         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1158
1159         all_leases = list(s0.get_leases())
1160         # and write enough data to expand the container, forcing the server
1161         # to move the leases
1162         write("si1", secrets(0),
1163               {0: ([], [(0,data)], 200), },
1164               [])
1165
1166         # read back the leases, make sure they're still intact.
1167         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1168
1169         ss.remote_renew_lease("si1", secrets(0)[1])
1170         ss.remote_renew_lease("si1", secrets(1)[1])
1171         ss.remote_renew_lease("si1", secrets(2)[1])
1172         ss.remote_renew_lease("si1", secrets(3)[1])
1173         ss.remote_renew_lease("si1", secrets(4)[1])
1174         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1175         # get a new copy of the leases, with the current timestamps. Reading
1176         # data and failing to renew/cancel leases should leave the timestamps
1177         # alone.
1178         all_leases = list(s0.get_leases())
1179         # renewing with a bogus token should prompt an error message
1180
1181         # examine the exception thus raised, make sure the old nodeid is
1182         # present, to provide for share migration
1183         e = self.failUnlessRaises(IndexError,
1184                                   ss.remote_renew_lease, "si1",
1185                                   secrets(20)[1])
1186         e_s = str(e)
1187         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1188         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1189         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1190
1191         # same for cancelling
1192         self.failUnlessRaises(IndexError,
1193                               ss.remote_cancel_lease, "si1",
1194                               secrets(20)[2])
1195         self.compare_leases(all_leases, list(s0.get_leases()))
1196
1197         # reading shares should not modify the timestamp
1198         read("si1", [], [(0,200)])
1199         self.compare_leases(all_leases, list(s0.get_leases()))
1200
1201         write("si1", secrets(0),
1202               {0: ([], [(200, "make me bigger")], None)}, [])
1203         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1204
1205         write("si1", secrets(0),
1206               {0: ([], [(500, "make me really bigger")], None)}, [])
1207         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1208
1209         # now cancel them all
1210         ss.remote_cancel_lease("si1", secrets(0)[2])
1211         ss.remote_cancel_lease("si1", secrets(1)[2])
1212         ss.remote_cancel_lease("si1", secrets(2)[2])
1213         ss.remote_cancel_lease("si1", secrets(3)[2])
1214
1215         # the slot should still be there
1216         remaining_shares = read("si1", [], [(0,10)])
1217         self.failUnlessEqual(len(remaining_shares), 1)
1218         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1219
1220         # cancelling a non-existent lease should raise an IndexError
1221         self.failUnlessRaises(IndexError,
1222                               ss.remote_cancel_lease, "si1", "nonsecret")
1223
1224         # and the slot should still be there
1225         remaining_shares = read("si1", [], [(0,10)])
1226         self.failUnlessEqual(len(remaining_shares), 1)
1227         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1228
1229         ss.remote_cancel_lease("si1", secrets(4)[2])
1230         # now the slot should be gone
1231         no_shares = read("si1", [], [(0,10)])
1232         self.failUnlessEqual(no_shares, {})
1233
1234         # cancelling a lease on a non-existent share should raise an IndexError
1235         self.failUnlessRaises(IndexError,
1236                               ss.remote_cancel_lease, "si2", "nonsecret")
1237
1238     def test_remove(self):
1239         ss = self.create("test_remove")
1240         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1241                       set([0,1,2]), 100)
1242         readv = ss.remote_slot_readv
1243         writev = ss.remote_slot_testv_and_readv_and_writev
1244         secrets = ( self.write_enabler("we1"),
1245                     self.renew_secret("we1"),
1246                     self.cancel_secret("we1") )
1247         # delete sh0 by setting its size to zero
1248         answer = writev("si1", secrets,
1249                         {0: ([], [], 0)},
1250                         [])
1251         # the answer should mention all the shares that existed before the
1252         # write
1253         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1254         # but a new read should show only sh1 and sh2
1255         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1256                              {1: [""], 2: [""]})
1257
1258         # delete sh1 by setting its size to zero
1259         answer = writev("si1", secrets,
1260                         {1: ([], [], 0)},
1261                         [])
1262         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1263         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1264                              {2: [""]})
1265
1266         # delete sh2 by setting its size to zero
1267         answer = writev("si1", secrets,
1268                         {2: ([], [], 0)},
1269                         [])
1270         self.failUnlessEqual(answer, (True, {2:[]}) )
1271         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1272                              {})
1273         # and the bucket directory should now be gone
1274         si = base32.b2a("si1")
1275         # note: this is a detail of the storage server implementation, and
1276         # may change in the future
1277         prefix = si[:2]
1278         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1279         bucketdir = os.path.join(prefixdir, si)
1280         self.failUnless(os.path.exists(prefixdir), prefixdir)
1281         self.failIf(os.path.exists(bucketdir), bucketdir)
1282
1283 class Stats(unittest.TestCase):
1284
1285     def setUp(self):
1286         self.sparent = LoggingServiceParent()
1287         self._lease_secret = itertools.count()
1288     def tearDown(self):
1289         return self.sparent.stopService()
1290
1291     def workdir(self, name):
1292         basedir = os.path.join("storage", "Server", name)
1293         return basedir
1294
1295     def create(self, name):
1296         workdir = self.workdir(name)
1297         ss = StorageServer(workdir, "\x00" * 20)
1298         ss.setServiceParent(self.sparent)
1299         return ss
1300
1301     def test_latencies(self):
1302         ss = self.create("test_latencies")
1303         for i in range(10000):
1304             ss.add_latency("allocate", 1.0 * i)
1305         for i in range(1000):
1306             ss.add_latency("renew", 1.0 * i)
1307         for i in range(10):
1308             ss.add_latency("cancel", 2.0 * i)
1309         ss.add_latency("get", 5.0)
1310
1311         output = ss.get_latencies()
1312
1313         self.failUnlessEqual(sorted(output.keys()),
1314                              sorted(["allocate", "renew", "cancel", "get"]))
1315         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1316         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1317         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1318         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1319         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1320         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1321         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1322         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1323         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1324
1325         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1326         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1327         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
1328         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1329         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1330         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1331         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1332         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1333         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1334
1335         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1336         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1337         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1, output)
1338         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
1339         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1340         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1341         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1342         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1343         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1344
1345         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1346         self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1347         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1348         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1349         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1350         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1351         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1352         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1353         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1354
1355 def remove_tags(s):
1356     s = re.sub(r'<[^>]*>', ' ', s)
1357     s = re.sub(r'\s+', ' ', s)
1358     return s
1359
1360 class MyBucketCountingCrawler(BucketCountingCrawler):
1361     def finished_prefix(self, cycle, prefix):
1362         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1363         if self.hook_ds:
1364             d = self.hook_ds.pop(0)
1365             d.callback(None)
1366
1367 class MyStorageServer(StorageServer):
1368     def add_bucket_counter(self):
1369         statefile = os.path.join(self.storedir, "bucket_counter.state")
1370         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1371         self.bucket_counter.setServiceParent(self)
1372
1373 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1374
1375     def setUp(self):
1376         self.s = service.MultiService()
1377         self.s.startService()
1378     def tearDown(self):
1379         return self.s.stopService()
1380
1381     def test_bucket_counter(self):
1382         basedir = "storage/BucketCounter/bucket_counter"
1383         fileutil.make_dirs(basedir)
1384         ss = StorageServer(basedir, "\x00" * 20)
1385         # to make sure we capture the bucket-counting-crawler in the middle
1386         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1387         # also make it start sooner than usual.
1388         ss.bucket_counter.slow_start = 0
1389         orig_cpu_slice = ss.bucket_counter.cpu_slice
1390         ss.bucket_counter.cpu_slice = 0
1391         ss.setServiceParent(self.s)
1392
1393         w = StorageStatus(ss)
1394
1395         # this sample is before the crawler has started doing anything
1396         html = w.renderSynchronously()
1397         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1398         s = remove_tags(html)
1399         self.failUnlessIn("Accepting new shares: Yes", s)
1400         self.failUnlessIn("Reserved space: - 0 B (0)", s)
1401         self.failUnlessIn("Total buckets: Not computed yet", s)
1402         self.failUnlessIn("Next crawl in", s)
1403
1404         # give the bucket-counting-crawler one tick to get started. The
1405         # cpu_slice=0 will force it to yield right after it processes the
1406         # first prefix
1407
1408         d = fireEventually()
1409         def _check(ignored):
1410             # are we really right after the first prefix?
1411             state = ss.bucket_counter.get_state()
1412             self.failUnlessEqual(state["last-complete-prefix"],
1413                                  ss.bucket_counter.prefixes[0])
1414             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1415             html = w.renderSynchronously()
1416             s = remove_tags(html)
1417             self.failUnlessIn(" Current crawl ", s)
1418             self.failUnlessIn(" (next work in ", s)
1419         d.addCallback(_check)
1420
1421         # now give it enough time to complete a full cycle
1422         def _watch():
1423             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1424         d.addCallback(lambda ignored: self.poll(_watch))
1425         def _check2(ignored):
1426             ss.bucket_counter.cpu_slice = orig_cpu_slice
1427             html = w.renderSynchronously()
1428             s = remove_tags(html)
1429             self.failUnlessIn("Total buckets: 0 (the number of", s)
1430             self.failUnlessIn("Next crawl in 59 minutes", s)
1431         d.addCallback(_check2)
1432         return d
1433
1434     def test_bucket_counter_cleanup(self):
1435         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1436         fileutil.make_dirs(basedir)
1437         ss = StorageServer(basedir, "\x00" * 20)
1438         # to make sure we capture the bucket-counting-crawler in the middle
1439         # of a cycle, we reach in and reduce its maximum slice time to 0.
1440         ss.bucket_counter.slow_start = 0
1441         orig_cpu_slice = ss.bucket_counter.cpu_slice
1442         ss.bucket_counter.cpu_slice = 0
1443         ss.setServiceParent(self.s)
1444
1445         d = fireEventually()
1446
1447         def _after_first_prefix(ignored):
1448             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1449             # now sneak in and mess with its state, to make sure it cleans up
1450             # properly at the end of the cycle
1451             state = ss.bucket_counter.state
1452             self.failUnlessEqual(state["last-complete-prefix"],
1453                                  ss.bucket_counter.prefixes[0])
1454             state["bucket-counts"][-12] = {}
1455             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1456             ss.bucket_counter.save_state()
1457         d.addCallback(_after_first_prefix)
1458
1459         # now give it enough time to complete a cycle
1460         def _watch():
1461             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1462         d.addCallback(lambda ignored: self.poll(_watch))
1463         def _check2(ignored):
1464             ss.bucket_counter.cpu_slice = orig_cpu_slice
1465             s = ss.bucket_counter.get_state()
1466             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1467             self.failIf("bogusprefix!" in s["storage-index-samples"],
1468                         s["storage-index-samples"].keys())
1469         d.addCallback(_check2)
1470         return d
1471
1472     def test_bucket_counter_eta(self):
1473         basedir = "storage/BucketCounter/bucket_counter_eta"
1474         fileutil.make_dirs(basedir)
1475         ss = MyStorageServer(basedir, "\x00" * 20)
1476         ss.bucket_counter.slow_start = 0
1477         # these will be fired inside finished_prefix()
1478         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1479         w = StorageStatus(ss)
1480
1481         d = defer.Deferred()
1482
1483         def _check_1(ignored):
1484             # no ETA is available yet
1485             html = w.renderSynchronously()
1486             s = remove_tags(html)
1487             self.failUnlessIn("complete (next work", s)
1488
1489         def _check_2(ignored):
1490             # one prefix has finished, so an ETA based upon that elapsed time
1491             # should be available.
1492             html = w.renderSynchronously()
1493             s = remove_tags(html)
1494             self.failUnlessIn("complete (ETA ", s)
1495
1496         def _check_3(ignored):
1497             # two prefixes have finished
1498             html = w.renderSynchronously()
1499             s = remove_tags(html)
1500             self.failUnlessIn("complete (ETA ", s)
1501             d.callback("done")
1502
1503         hooks[0].addCallback(_check_1).addErrback(d.errback)
1504         hooks[1].addCallback(_check_2).addErrback(d.errback)
1505         hooks[2].addCallback(_check_3).addErrback(d.errback)
1506
1507         ss.setServiceParent(self.s)
1508         return d
1509
1510 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1511     stop_after_first_bucket = False
1512     def process_bucket(self, *args, **kwargs):
1513         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1514         if self.stop_after_first_bucket:
1515             self.stop_after_first_bucket = False
1516             self.cpu_slice = -1.0
1517     def yielding(self, sleep_time):
1518         if not self.stop_after_first_bucket:
1519             self.cpu_slice = 500
1520
1521 class BrokenStatResults:
1522     pass
1523 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1524     def stat(self, fn):
1525         s = os.stat(fn)
1526         bsr = BrokenStatResults()
1527         for attrname in dir(s):
1528             if attrname.startswith("_"):
1529                 continue
1530             if attrname == "st_blocks":
1531                 continue
1532             setattr(bsr, attrname, getattr(s, attrname))
1533         return bsr
1534
1535 class InstrumentedStorageServer(StorageServer):
1536     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1537 class No_ST_BLOCKS_StorageServer(StorageServer):
1538     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1539
1540 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1541
1542     def setUp(self):
1543         self.s = service.MultiService()
1544         self.s.startService()
1545     def tearDown(self):
1546         return self.s.stopService()
1547
1548     def make_shares(self, ss):
1549         def make(si):
1550             return (si, hashutil.tagged_hash("renew", si),
1551                     hashutil.tagged_hash("cancel", si))
1552         def make_mutable(si):
1553             return (si, hashutil.tagged_hash("renew", si),
1554                     hashutil.tagged_hash("cancel", si),
1555                     hashutil.tagged_hash("write-enabler", si))
1556         def make_extra_lease(si, num):
1557             return (hashutil.tagged_hash("renew-%d" % num, si),
1558                     hashutil.tagged_hash("cancel-%d" % num, si))
1559
1560         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1561         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1562         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1563         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1564         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1565         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1566         sharenums = [0]
1567         canary = FakeCanary()
1568         # note: 'tahoe debug dump-share' will not handle this file, since the
1569         # inner contents are not a valid CHK share
1570         data = "\xff" * 1000
1571
1572         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1573                                          1000, canary)
1574         w[0].remote_write(0, data)
1575         w[0].remote_close()
1576
1577         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1578                                          1000, canary)
1579         w[0].remote_write(0, data)
1580         w[0].remote_close()
1581         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1582
1583         writev = ss.remote_slot_testv_and_readv_and_writev
1584         writev(mutable_si_2, (we2, rs2, cs2),
1585                {0: ([], [(0,data)], len(data))}, [])
1586         writev(mutable_si_3, (we3, rs3, cs3),
1587                {0: ([], [(0,data)], len(data))}, [])
1588         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1589
1590         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1591         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1592         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1593
1594     def test_basic(self):
1595         basedir = "storage/LeaseCrawler/basic"
1596         fileutil.make_dirs(basedir)
1597         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1598         # make it start sooner than usual.
1599         lc = ss.lease_checker
1600         lc.slow_start = 0
1601         lc.cpu_slice = 500
1602         lc.stop_after_first_bucket = True
1603         webstatus = StorageStatus(ss)
1604
1605         # create a few shares, with some leases on them
1606         self.make_shares(ss)
1607         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1608
1609         # add a non-sharefile to exercise another code path
1610         fn = os.path.join(ss.sharedir,
1611                           storage_index_to_dir(immutable_si_0),
1612                           "not-a-share")
1613         f = open(fn, "wb")
1614         f.write("I am not a share.\n")
1615         f.close()
1616
1617         # this is before the crawl has started, so we're not in a cycle yet
1618         initial_state = lc.get_state()
1619         self.failIf(lc.get_progress()["cycle-in-progress"])
1620         self.failIfIn("cycle-to-date", initial_state)
1621         self.failIfIn("estimated-remaining-cycle", initial_state)
1622         self.failIfIn("estimated-current-cycle", initial_state)
1623         self.failUnlessIn("history", initial_state)
1624         self.failUnlessEqual(initial_state["history"], {})
1625
1626         ss.setServiceParent(self.s)
1627
1628         DAY = 24*60*60
1629
1630         d = fireEventually()
1631
1632         # now examine the state right after the first bucket has been
1633         # processed.
1634         def _after_first_bucket(ignored):
1635             initial_state = lc.get_state()
1636             self.failUnlessIn("cycle-to-date", initial_state)
1637             self.failUnlessIn("estimated-remaining-cycle", initial_state)
1638             self.failUnlessIn("estimated-current-cycle", initial_state)
1639             self.failUnlessIn("history", initial_state)
1640             self.failUnlessEqual(initial_state["history"], {})
1641
1642             so_far = initial_state["cycle-to-date"]
1643             self.failUnlessEqual(so_far["expiration-enabled"], False)
1644             self.failUnlessIn("configured-expiration-mode", so_far)
1645             self.failUnlessIn("lease-age-histogram", so_far)
1646             lah = so_far["lease-age-histogram"]
1647             self.failUnlessEqual(type(lah), list)
1648             self.failUnlessEqual(len(lah), 1)
1649             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1650             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1651             self.failUnlessEqual(so_far["corrupt-shares"], [])
1652             sr1 = so_far["space-recovered"]
1653             self.failUnlessEqual(sr1["examined-buckets"], 1)
1654             self.failUnlessEqual(sr1["examined-shares"], 1)
1655             self.failUnlessEqual(sr1["actual-shares"], 0)
1656             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1657             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1658             left = initial_state["estimated-remaining-cycle"]
1659             sr2 = left["space-recovered"]
1660             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1661             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1662             self.failIfEqual(sr2["actual-shares"], None)
1663             self.failIfEqual(sr2["configured-diskbytes"], None)
1664             self.failIfEqual(sr2["original-sharebytes"], None)
1665         d.addCallback(_after_first_bucket)
1666         d.addCallback(lambda ign: self.render1(webstatus))
1667         def _check_html_in_cycle(html):
1668             s = remove_tags(html)
1669             self.failUnlessIn("So far, this cycle has examined "
1670                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1671             self.failUnlessIn("and has recovered: "
1672                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1673                               "0 B (0 B / 0 B)", s)
1674             self.failUnlessIn("If expiration were enabled, "
1675                               "we would have recovered: "
1676                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1677                               " 0 B (0 B / 0 B) by now", s)
1678             self.failUnlessIn("and the remainder of this cycle "
1679                               "would probably recover: "
1680                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1681                               " 0 B (0 B / 0 B)", s)
1682             self.failUnlessIn("and the whole cycle would probably recover: "
1683                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1684                               " 0 B (0 B / 0 B)", s)
1685             self.failUnlessIn("if we were strictly using each lease's default "
1686                               "31-day lease lifetime", s)
1687             self.failUnlessIn("this cycle would be expected to recover: ", s)
1688         d.addCallback(_check_html_in_cycle)
1689
1690         # wait for the crawler to finish the first cycle. Nothing should have
1691         # been removed.
1692         def _wait():
1693             return bool(lc.get_state()["last-cycle-finished"] is not None)
1694         d.addCallback(lambda ign: self.poll(_wait))
1695
1696         def _after_first_cycle(ignored):
1697             s = lc.get_state()
1698             self.failIf("cycle-to-date" in s)
1699             self.failIf("estimated-remaining-cycle" in s)
1700             self.failIf("estimated-current-cycle" in s)
1701             last = s["history"][0]
1702             self.failUnlessIn("cycle-start-finish-times", last)
1703             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1704             self.failUnlessEqual(last["expiration-enabled"], False)
1705             self.failUnlessIn("configured-expiration-mode", last)
1706
1707             self.failUnlessIn("lease-age-histogram", last)
1708             lah = last["lease-age-histogram"]
1709             self.failUnlessEqual(type(lah), list)
1710             self.failUnlessEqual(len(lah), 1)
1711             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1712
1713             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1714             self.failUnlessEqual(last["corrupt-shares"], [])
1715
1716             rec = last["space-recovered"]
1717             self.failUnlessEqual(rec["examined-buckets"], 4)
1718             self.failUnlessEqual(rec["examined-shares"], 4)
1719             self.failUnlessEqual(rec["actual-buckets"], 0)
1720             self.failUnlessEqual(rec["original-buckets"], 0)
1721             self.failUnlessEqual(rec["configured-buckets"], 0)
1722             self.failUnlessEqual(rec["actual-shares"], 0)
1723             self.failUnlessEqual(rec["original-shares"], 0)
1724             self.failUnlessEqual(rec["configured-shares"], 0)
1725             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1726             self.failUnlessEqual(rec["original-diskbytes"], 0)
1727             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1728             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1729             self.failUnlessEqual(rec["original-sharebytes"], 0)
1730             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1731
1732             def _get_sharefile(si):
1733                 return list(ss._iter_share_files(si))[0]
1734             def count_leases(si):
1735                 return len(list(_get_sharefile(si).get_leases()))
1736             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1737             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1738             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1739             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1740         d.addCallback(_after_first_cycle)
1741         d.addCallback(lambda ign: self.render1(webstatus))
1742         def _check_html(html):
1743             s = remove_tags(html)
1744             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1745                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1746             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1747                               "(2 mutable / 2 immutable),", s)
1748             self.failUnlessIn("but expiration was not enabled", s)
1749         d.addCallback(_check_html)
1750         d.addCallback(lambda ign: self.render_json(webstatus))
1751         def _check_json(json):
1752             data = simplejson.loads(json)
1753             self.failUnlessIn("lease-checker", data)
1754             self.failUnlessIn("lease-checker-progress", data)
1755         d.addCallback(_check_json)
1756         return d
1757
1758     def backdate_lease(self, sf, renew_secret, new_expire_time):
1759         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1760         # "renew" a lease with a new_expire_time that is older than what the
1761         # current lease has), so we have to reach inside it.
1762         for i,lease in enumerate(sf.get_leases()):
1763             if lease.renew_secret == renew_secret:
1764                 lease.expiration_time = new_expire_time
1765                 f = open(sf.home, 'rb+')
1766                 sf._write_lease_record(f, i, lease)
1767                 f.close()
1768                 return
1769         raise IndexError("unable to renew non-existent lease")
1770
1771     def test_expire_age(self):
1772         basedir = "storage/LeaseCrawler/expire_age"
1773         fileutil.make_dirs(basedir)
1774         # setting expiration_time to 2000 means that any lease which is more
1775         # than 2000s old will be expired.
1776         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1777                                        expiration_enabled=True,
1778                                        expiration_mode="age",
1779                                        expiration_override_lease_duration=2000)
1780         # make it start sooner than usual.
1781         lc = ss.lease_checker
1782         lc.slow_start = 0
1783         lc.stop_after_first_bucket = True
1784         webstatus = StorageStatus(ss)
1785
1786         # create a few shares, with some leases on them
1787         self.make_shares(ss)
1788         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1789
1790         def count_shares(si):
1791             return len(list(ss._iter_share_files(si)))
1792         def _get_sharefile(si):
1793             return list(ss._iter_share_files(si))[0]
1794         def count_leases(si):
1795             return len(list(_get_sharefile(si).get_leases()))
1796
1797         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1798         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1799         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1800         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1801         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1802         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1803         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1804         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1805
1806         # artificially crank back the expiration time on the first lease of
1807         # each share, to make it look like it expired already (age=1000s).
1808         # Some shares have an extra lease which is set to expire at the
1809         # default time in 31 days from now (age=31days). We then run the
1810         # crawler, which will expire the first lease, making some shares get
1811         # deleted and others stay alive (with one remaining lease)
1812         now = time.time()
1813
1814         sf0 = _get_sharefile(immutable_si_0)
1815         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1816         sf0_size = os.stat(sf0.home).st_size
1817
1818         # immutable_si_1 gets an extra lease
1819         sf1 = _get_sharefile(immutable_si_1)
1820         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1821
1822         sf2 = _get_sharefile(mutable_si_2)
1823         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1824         sf2_size = os.stat(sf2.home).st_size
1825
1826         # mutable_si_3 gets an extra lease
1827         sf3 = _get_sharefile(mutable_si_3)
1828         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1829
1830         ss.setServiceParent(self.s)
1831
1832         d = fireEventually()
1833         # examine the state right after the first bucket has been processed
1834         def _after_first_bucket(ignored):
1835             p = lc.get_progress()
1836             self.failUnless(p["cycle-in-progress"], p)
1837         d.addCallback(_after_first_bucket)
1838         d.addCallback(lambda ign: self.render1(webstatus))
1839         def _check_html_in_cycle(html):
1840             s = remove_tags(html)
1841             # the first bucket encountered gets deleted, and its prefix
1842             # happens to be about 1/5th of the way through the ring, so the
1843             # predictor thinks we'll have 5 shares and that we'll delete them
1844             # all. This part of the test depends upon the SIs landing right
1845             # where they do now.
1846             self.failUnlessIn("The remainder of this cycle is expected to "
1847                               "recover: 4 shares, 4 buckets", s)
1848             self.failUnlessIn("The whole cycle is expected to examine "
1849                               "5 shares in 5 buckets and to recover: "
1850                               "5 shares, 5 buckets", s)
1851         d.addCallback(_check_html_in_cycle)
1852
1853         # wait for the crawler to finish the first cycle. Two shares should
1854         # have been removed
1855         def _wait():
1856             return bool(lc.get_state()["last-cycle-finished"] is not None)
1857         d.addCallback(lambda ign: self.poll(_wait))
1858
1859         def _after_first_cycle(ignored):
1860             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1861             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1862             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1863             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1864             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1865             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1866
1867             s = lc.get_state()
1868             last = s["history"][0]
1869
1870             self.failUnlessEqual(last["expiration-enabled"], True)
1871             self.failUnlessEqual(last["configured-expiration-mode"],
1872                                  ("age", 2000, None, ("mutable", "immutable")))
1873             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1874
1875             rec = last["space-recovered"]
1876             self.failUnlessEqual(rec["examined-buckets"], 4)
1877             self.failUnlessEqual(rec["examined-shares"], 4)
1878             self.failUnlessEqual(rec["actual-buckets"], 2)
1879             self.failUnlessEqual(rec["original-buckets"], 2)
1880             self.failUnlessEqual(rec["configured-buckets"], 2)
1881             self.failUnlessEqual(rec["actual-shares"], 2)
1882             self.failUnlessEqual(rec["original-shares"], 2)
1883             self.failUnlessEqual(rec["configured-shares"], 2)
1884             size = sf0_size + sf2_size
1885             self.failUnlessEqual(rec["actual-sharebytes"], size)
1886             self.failUnlessEqual(rec["original-sharebytes"], size)
1887             self.failUnlessEqual(rec["configured-sharebytes"], size)
1888             # different platforms have different notions of "blocks used by
1889             # this file", so merely assert that it's a number
1890             self.failUnless(rec["actual-diskbytes"] >= 0,
1891                             rec["actual-diskbytes"])
1892             self.failUnless(rec["original-diskbytes"] >= 0,
1893                             rec["original-diskbytes"])
1894             self.failUnless(rec["configured-diskbytes"] >= 0,
1895                             rec["configured-diskbytes"])
1896         d.addCallback(_after_first_cycle)
1897         d.addCallback(lambda ign: self.render1(webstatus))
1898         def _check_html(html):
1899             s = remove_tags(html)
1900             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1901             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1902             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1903         d.addCallback(_check_html)
1904         return d
1905
1906     def test_expire_cutoff_date(self):
1907         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1908         fileutil.make_dirs(basedir)
1909         # setting cutoff-date to 2000 seconds ago means that any lease which
1910         # is more than 2000s old will be expired.
1911         now = time.time()
1912         then = int(now - 2000)
1913         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1914                                        expiration_enabled=True,
1915                                        expiration_mode="cutoff-date",
1916                                        expiration_cutoff_date=then)
1917         # make it start sooner than usual.
1918         lc = ss.lease_checker
1919         lc.slow_start = 0
1920         lc.stop_after_first_bucket = True
1921         webstatus = StorageStatus(ss)
1922
1923         # create a few shares, with some leases on them
1924         self.make_shares(ss)
1925         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1926
1927         def count_shares(si):
1928             return len(list(ss._iter_share_files(si)))
1929         def _get_sharefile(si):
1930             return list(ss._iter_share_files(si))[0]
1931         def count_leases(si):
1932             return len(list(_get_sharefile(si).get_leases()))
1933
1934         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1935         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1936         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1937         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1938         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1939         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1940         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1941         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1942
1943         # artificially crank back the expiration time on the first lease of
1944         # each share, to make it look like was renewed 3000s ago. To achieve
1945         # this, we need to set the expiration time to now-3000+31days. This
1946         # will change when the lease format is improved to contain both
1947         # create/renew time and duration.
1948         new_expiration_time = now - 3000 + 31*24*60*60
1949
1950         # Some shares have an extra lease which is set to expire at the
1951         # default time in 31 days from now (age=31days). We then run the
1952         # crawler, which will expire the first lease, making some shares get
1953         # deleted and others stay alive (with one remaining lease)
1954
1955         sf0 = _get_sharefile(immutable_si_0)
1956         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1957         sf0_size = os.stat(sf0.home).st_size
1958
1959         # immutable_si_1 gets an extra lease
1960         sf1 = _get_sharefile(immutable_si_1)
1961         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1962
1963         sf2 = _get_sharefile(mutable_si_2)
1964         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1965         sf2_size = os.stat(sf2.home).st_size
1966
1967         # mutable_si_3 gets an extra lease
1968         sf3 = _get_sharefile(mutable_si_3)
1969         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1970
1971         ss.setServiceParent(self.s)
1972
1973         d = fireEventually()
1974         # examine the state right after the first bucket has been processed
1975         def _after_first_bucket(ignored):
1976             p = lc.get_progress()
1977             self.failUnless(p["cycle-in-progress"], p)
1978         d.addCallback(_after_first_bucket)
1979         d.addCallback(lambda ign: self.render1(webstatus))
1980         def _check_html_in_cycle(html):
1981             s = remove_tags(html)
1982             # the first bucket encountered gets deleted, and its prefix
1983             # happens to be about 1/5th of the way through the ring, so the
1984             # predictor thinks we'll have 5 shares and that we'll delete them
1985             # all. This part of the test depends upon the SIs landing right
1986             # where they do now.
1987             self.failUnlessIn("The remainder of this cycle is expected to "
1988                               "recover: 4 shares, 4 buckets", s)
1989             self.failUnlessIn("The whole cycle is expected to examine "
1990                               "5 shares in 5 buckets and to recover: "
1991                               "5 shares, 5 buckets", s)
1992         d.addCallback(_check_html_in_cycle)
1993
1994         # wait for the crawler to finish the first cycle. Two shares should
1995         # have been removed
1996         def _wait():
1997             return bool(lc.get_state()["last-cycle-finished"] is not None)
1998         d.addCallback(lambda ign: self.poll(_wait))
1999
2000         def _after_first_cycle(ignored):
2001             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2002             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2003             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2004             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2005             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2006             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2007
2008             s = lc.get_state()
2009             last = s["history"][0]
2010
2011             self.failUnlessEqual(last["expiration-enabled"], True)
2012             self.failUnlessEqual(last["configured-expiration-mode"],
2013                                  ("cutoff-date", None, then,
2014                                   ("mutable", "immutable")))
2015             self.failUnlessEqual(last["leases-per-share-histogram"],
2016                                  {1: 2, 2: 2})
2017
2018             rec = last["space-recovered"]
2019             self.failUnlessEqual(rec["examined-buckets"], 4)
2020             self.failUnlessEqual(rec["examined-shares"], 4)
2021             self.failUnlessEqual(rec["actual-buckets"], 2)
2022             self.failUnlessEqual(rec["original-buckets"], 0)
2023             self.failUnlessEqual(rec["configured-buckets"], 2)
2024             self.failUnlessEqual(rec["actual-shares"], 2)
2025             self.failUnlessEqual(rec["original-shares"], 0)
2026             self.failUnlessEqual(rec["configured-shares"], 2)
2027             size = sf0_size + sf2_size
2028             self.failUnlessEqual(rec["actual-sharebytes"], size)
2029             self.failUnlessEqual(rec["original-sharebytes"], 0)
2030             self.failUnlessEqual(rec["configured-sharebytes"], size)
2031             # different platforms have different notions of "blocks used by
2032             # this file", so merely assert that it's a number
2033             self.failUnless(rec["actual-diskbytes"] >= 0,
2034                             rec["actual-diskbytes"])
2035             self.failUnless(rec["original-diskbytes"] >= 0,
2036                             rec["original-diskbytes"])
2037             self.failUnless(rec["configured-diskbytes"] >= 0,
2038                             rec["configured-diskbytes"])
2039         d.addCallback(_after_first_cycle)
2040         d.addCallback(lambda ign: self.render1(webstatus))
2041         def _check_html(html):
2042             s = remove_tags(html)
2043             self.failUnlessIn("Expiration Enabled:"
2044                               " expired leases will be removed", s)
2045             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2046             substr = "Leases created or last renewed before %s will be considered expired." % date
2047             self.failUnlessIn(substr, s)
2048             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2049         d.addCallback(_check_html)
2050         return d
2051
2052     def test_only_immutable(self):
2053         basedir = "storage/LeaseCrawler/only_immutable"
2054         fileutil.make_dirs(basedir)
2055         now = time.time()
2056         then = int(now - 2000)
2057         ss = StorageServer(basedir, "\x00" * 20,
2058                            expiration_enabled=True,
2059                            expiration_mode="cutoff-date",
2060                            expiration_cutoff_date=then,
2061                            expiration_sharetypes=("immutable",))
2062         lc = ss.lease_checker
2063         lc.slow_start = 0
2064         webstatus = StorageStatus(ss)
2065
2066         self.make_shares(ss)
2067         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2068         # set all leases to be expirable
2069         new_expiration_time = now - 3000 + 31*24*60*60
2070
2071         def count_shares(si):
2072             return len(list(ss._iter_share_files(si)))
2073         def _get_sharefile(si):
2074             return list(ss._iter_share_files(si))[0]
2075         def count_leases(si):
2076             return len(list(_get_sharefile(si).get_leases()))
2077
2078         sf0 = _get_sharefile(immutable_si_0)
2079         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2080         sf1 = _get_sharefile(immutable_si_1)
2081         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2082         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2083         sf2 = _get_sharefile(mutable_si_2)
2084         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2085         sf3 = _get_sharefile(mutable_si_3)
2086         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2087         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2088
2089         ss.setServiceParent(self.s)
2090         def _wait():
2091             return bool(lc.get_state()["last-cycle-finished"] is not None)
2092         d = self.poll(_wait)
2093
2094         def _after_first_cycle(ignored):
2095             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2096             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2097             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2098             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2099             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2100             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2101         d.addCallback(_after_first_cycle)
2102         d.addCallback(lambda ign: self.render1(webstatus))
2103         def _check_html(html):
2104             s = remove_tags(html)
2105             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2106         d.addCallback(_check_html)
2107         return d
2108
2109     def test_only_mutable(self):
2110         basedir = "storage/LeaseCrawler/only_mutable"
2111         fileutil.make_dirs(basedir)
2112         now = time.time()
2113         then = int(now - 2000)
2114         ss = StorageServer(basedir, "\x00" * 20,
2115                            expiration_enabled=True,
2116                            expiration_mode="cutoff-date",
2117                            expiration_cutoff_date=then,
2118                            expiration_sharetypes=("mutable",))
2119         lc = ss.lease_checker
2120         lc.slow_start = 0
2121         webstatus = StorageStatus(ss)
2122
2123         self.make_shares(ss)
2124         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2125         # set all leases to be expirable
2126         new_expiration_time = now - 3000 + 31*24*60*60
2127
2128         def count_shares(si):
2129             return len(list(ss._iter_share_files(si)))
2130         def _get_sharefile(si):
2131             return list(ss._iter_share_files(si))[0]
2132         def count_leases(si):
2133             return len(list(_get_sharefile(si).get_leases()))
2134
2135         sf0 = _get_sharefile(immutable_si_0)
2136         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2137         sf1 = _get_sharefile(immutable_si_1)
2138         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2139         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2140         sf2 = _get_sharefile(mutable_si_2)
2141         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2142         sf3 = _get_sharefile(mutable_si_3)
2143         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2144         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2145
2146         ss.setServiceParent(self.s)
2147         def _wait():
2148             return bool(lc.get_state()["last-cycle-finished"] is not None)
2149         d = self.poll(_wait)
2150
2151         def _after_first_cycle(ignored):
2152             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2153             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2154             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2155             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2156             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2157             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2158         d.addCallback(_after_first_cycle)
2159         d.addCallback(lambda ign: self.render1(webstatus))
2160         def _check_html(html):
2161             s = remove_tags(html)
2162             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2163         d.addCallback(_check_html)
2164         return d
2165
2166     def test_bad_mode(self):
2167         basedir = "storage/LeaseCrawler/bad_mode"
2168         fileutil.make_dirs(basedir)
2169         e = self.failUnlessRaises(ValueError,
2170                                   StorageServer, basedir, "\x00" * 20,
2171                                   expiration_mode="bogus")
2172         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2173
2174     def test_parse_duration(self):
2175         DAY = 24*60*60
2176         MONTH = 31*DAY
2177         YEAR = 365*DAY
2178         p = time_format.parse_duration
2179         self.failUnlessEqual(p("7days"), 7*DAY)
2180         self.failUnlessEqual(p("31day"), 31*DAY)
2181         self.failUnlessEqual(p("60 days"), 60*DAY)
2182         self.failUnlessEqual(p("2mo"), 2*MONTH)
2183         self.failUnlessEqual(p("3 month"), 3*MONTH)
2184         self.failUnlessEqual(p("2years"), 2*YEAR)
2185         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2186         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2187
2188     def test_parse_date(self):
2189         p = time_format.parse_date
2190         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2191         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2192
2193     def test_limited_history(self):
2194         basedir = "storage/LeaseCrawler/limited_history"
2195         fileutil.make_dirs(basedir)
2196         ss = StorageServer(basedir, "\x00" * 20)
2197         # make it start sooner than usual.
2198         lc = ss.lease_checker
2199         lc.slow_start = 0
2200         lc.cpu_slice = 500
2201
2202         # create a few shares, with some leases on them
2203         self.make_shares(ss)
2204
2205         ss.setServiceParent(self.s)
2206
2207         def _wait_until_15_cycles_done():
2208             last = lc.state["last-cycle-finished"]
2209             if last is not None and last >= 15:
2210                 return True
2211             if lc.timer:
2212                 lc.timer.reset(0)
2213             return False
2214         d = self.poll(_wait_until_15_cycles_done)
2215
2216         def _check(ignored):
2217             s = lc.get_state()
2218             h = s["history"]
2219             self.failUnlessEqual(len(h), 10)
2220             self.failUnlessEqual(max(h.keys()), 15)
2221             self.failUnlessEqual(min(h.keys()), 6)
2222         d.addCallback(_check)
2223         return d
2224
2225     def test_unpredictable_future(self):
2226         basedir = "storage/LeaseCrawler/unpredictable_future"
2227         fileutil.make_dirs(basedir)
2228         ss = StorageServer(basedir, "\x00" * 20)
2229         # make it start sooner than usual.
2230         lc = ss.lease_checker
2231         lc.slow_start = 0
2232         lc.cpu_slice = -1.0 # stop quickly
2233
2234         self.make_shares(ss)
2235
2236         ss.setServiceParent(self.s)
2237
2238         d = fireEventually()
2239         def _check(ignored):
2240             # this should fire after the first bucket is complete, but before
2241             # the first prefix is complete, so the progress-measurer won't
2242             # think we've gotten far enough to raise our percent-complete
2243             # above 0%, triggering the cannot-predict-the-future code in
2244             # expirer.py . This will have to change if/when the
2245             # progress-measurer gets smart enough to count buckets (we'll
2246             # have to interrupt it even earlier, before it's finished the
2247             # first bucket).
2248             s = lc.get_state()
2249             self.failUnlessIn("cycle-to-date", s)
2250             self.failUnlessIn("estimated-remaining-cycle", s)
2251             self.failUnlessIn("estimated-current-cycle", s)
2252
2253             left = s["estimated-remaining-cycle"]["space-recovered"]
2254             self.failUnlessEqual(left["actual-buckets"], None)
2255             self.failUnlessEqual(left["original-buckets"], None)
2256             self.failUnlessEqual(left["configured-buckets"], None)
2257             self.failUnlessEqual(left["actual-shares"], None)
2258             self.failUnlessEqual(left["original-shares"], None)
2259             self.failUnlessEqual(left["configured-shares"], None)
2260             self.failUnlessEqual(left["actual-diskbytes"], None)
2261             self.failUnlessEqual(left["original-diskbytes"], None)
2262             self.failUnlessEqual(left["configured-diskbytes"], None)
2263             self.failUnlessEqual(left["actual-sharebytes"], None)
2264             self.failUnlessEqual(left["original-sharebytes"], None)
2265             self.failUnlessEqual(left["configured-sharebytes"], None)
2266
2267             full = s["estimated-remaining-cycle"]["space-recovered"]
2268             self.failUnlessEqual(full["actual-buckets"], None)
2269             self.failUnlessEqual(full["original-buckets"], None)
2270             self.failUnlessEqual(full["configured-buckets"], None)
2271             self.failUnlessEqual(full["actual-shares"], None)
2272             self.failUnlessEqual(full["original-shares"], None)
2273             self.failUnlessEqual(full["configured-shares"], None)
2274             self.failUnlessEqual(full["actual-diskbytes"], None)
2275             self.failUnlessEqual(full["original-diskbytes"], None)
2276             self.failUnlessEqual(full["configured-diskbytes"], None)
2277             self.failUnlessEqual(full["actual-sharebytes"], None)
2278             self.failUnlessEqual(full["original-sharebytes"], None)
2279             self.failUnlessEqual(full["configured-sharebytes"], None)
2280
2281         d.addCallback(_check)
2282         return d
2283
2284     def test_no_st_blocks(self):
2285         basedir = "storage/LeaseCrawler/no_st_blocks"
2286         fileutil.make_dirs(basedir)
2287         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2288                                         expiration_mode="age",
2289                                         expiration_override_lease_duration=-1000)
2290         # a negative expiration_time= means the "configured-"
2291         # space-recovered counts will be non-zero, since all shares will have
2292         # expired by then
2293
2294         # make it start sooner than usual.
2295         lc = ss.lease_checker
2296         lc.slow_start = 0
2297
2298         self.make_shares(ss)
2299         ss.setServiceParent(self.s)
2300         def _wait():
2301             return bool(lc.get_state()["last-cycle-finished"] is not None)
2302         d = self.poll(_wait)
2303
2304         def _check(ignored):
2305             s = lc.get_state()
2306             last = s["history"][0]
2307             rec = last["space-recovered"]
2308             self.failUnlessEqual(rec["configured-buckets"], 4)
2309             self.failUnlessEqual(rec["configured-shares"], 4)
2310             self.failUnless(rec["configured-sharebytes"] > 0,
2311                             rec["configured-sharebytes"])
2312             # without the .st_blocks field in os.stat() results, we should be
2313             # reporting diskbytes==sharebytes
2314             self.failUnlessEqual(rec["configured-sharebytes"],
2315                                  rec["configured-diskbytes"])
2316         d.addCallback(_check)
2317         return d
2318
2319     def test_share_corruption(self):
2320         self._poll_should_ignore_these_errors = [
2321             UnknownMutableContainerVersionError,
2322             UnknownImmutableContainerVersionError,
2323             ]
2324         basedir = "storage/LeaseCrawler/share_corruption"
2325         fileutil.make_dirs(basedir)
2326         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2327         w = StorageStatus(ss)
2328         # make it start sooner than usual.
2329         lc = ss.lease_checker
2330         lc.stop_after_first_bucket = True
2331         lc.slow_start = 0
2332         lc.cpu_slice = 500
2333
2334         # create a few shares, with some leases on them
2335         self.make_shares(ss)
2336
2337         # now corrupt one, and make sure the lease-checker keeps going
2338         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2339         first = min(self.sis)
2340         first_b32 = base32.b2a(first)
2341         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2342         f = open(fn, "rb+")
2343         f.seek(0)
2344         f.write("BAD MAGIC")
2345         f.close()
2346         # if get_share_file() doesn't see the correct mutable magic, it
2347         # assumes the file is an immutable share, and then
2348         # immutable.ShareFile sees a bad version. So regardless of which kind
2349         # of share we corrupted, this will trigger an
2350         # UnknownImmutableContainerVersionError.
2351
2352         # also create an empty bucket
2353         empty_si = base32.b2a("\x04"*16)
2354         empty_bucket_dir = os.path.join(ss.sharedir,
2355                                         storage_index_to_dir(empty_si))
2356         fileutil.make_dirs(empty_bucket_dir)
2357
2358         ss.setServiceParent(self.s)
2359
2360         d = fireEventually()
2361
2362         # now examine the state right after the first bucket has been
2363         # processed.
2364         def _after_first_bucket(ignored):
2365             so_far = lc.get_state()["cycle-to-date"]
2366             rec = so_far["space-recovered"]
2367             self.failUnlessEqual(rec["examined-buckets"], 1)
2368             self.failUnlessEqual(rec["examined-shares"], 0)
2369             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2370         d.addCallback(_after_first_bucket)
2371
2372         d.addCallback(lambda ign: self.render_json(w))
2373         def _check_json(json):
2374             data = simplejson.loads(json)
2375             # grr. json turns all dict keys into strings.
2376             so_far = data["lease-checker"]["cycle-to-date"]
2377             corrupt_shares = so_far["corrupt-shares"]
2378             # it also turns all tuples into lists
2379             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2380         d.addCallback(_check_json)
2381         d.addCallback(lambda ign: self.render1(w))
2382         def _check_html(html):
2383             s = remove_tags(html)
2384             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2385         d.addCallback(_check_html)
2386
2387         def _wait():
2388             return bool(lc.get_state()["last-cycle-finished"] is not None)
2389         d.addCallback(lambda ign: self.poll(_wait))
2390
2391         def _after_first_cycle(ignored):
2392             s = lc.get_state()
2393             last = s["history"][0]
2394             rec = last["space-recovered"]
2395             self.failUnlessEqual(rec["examined-buckets"], 5)
2396             self.failUnlessEqual(rec["examined-shares"], 3)
2397             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2398         d.addCallback(_after_first_cycle)
2399         d.addCallback(lambda ign: self.render_json(w))
2400         def _check_json_history(json):
2401             data = simplejson.loads(json)
2402             last = data["lease-checker"]["history"]["0"]
2403             corrupt_shares = last["corrupt-shares"]
2404             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2405         d.addCallback(_check_json_history)
2406         d.addCallback(lambda ign: self.render1(w))
2407         def _check_html_history(html):
2408             s = remove_tags(html)
2409             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2410         d.addCallback(_check_html_history)
2411
2412         def _cleanup(res):
2413             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2414                                    UnknownImmutableContainerVersionError)
2415             return res
2416         d.addBoth(_cleanup)
2417         return d
2418
2419     def render_json(self, page):
2420         d = self.render1(page, args={"t": ["json"]})
2421         return d
2422
2423 class NoDiskStatsServer(StorageServer):
2424     def get_disk_stats(self):
2425         raise AttributeError
2426
2427 class BadDiskStatsServer(StorageServer):
2428     def get_disk_stats(self):
2429         raise OSError
2430
2431 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2432
2433     def setUp(self):
2434         self.s = service.MultiService()
2435         self.s.startService()
2436     def tearDown(self):
2437         return self.s.stopService()
2438
2439     def test_no_server(self):
2440         w = StorageStatus(None)
2441         html = w.renderSynchronously()
2442         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2443
2444     def test_status(self):
2445         basedir = "storage/WebStatus/status"
2446         fileutil.make_dirs(basedir)
2447         ss = StorageServer(basedir, "\x00" * 20)
2448         ss.setServiceParent(self.s)
2449         w = StorageStatus(ss)
2450         d = self.render1(w)
2451         def _check_html(html):
2452             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2453             s = remove_tags(html)
2454             self.failUnlessIn("Accepting new shares: Yes", s)
2455             self.failUnlessIn("Reserved space: - 0 B (0)", s)
2456         d.addCallback(_check_html)
2457         d.addCallback(lambda ign: self.render_json(w))
2458         def _check_json(json):
2459             data = simplejson.loads(json)
2460             s = data["stats"]
2461             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2462             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2463             self.failUnlessIn("bucket-counter", data)
2464             self.failUnlessIn("lease-checker", data)
2465         d.addCallback(_check_json)
2466         return d
2467
2468     def render_json(self, page):
2469         d = self.render1(page, args={"t": ["json"]})
2470         return d
2471
2472     def test_status_no_disk_stats(self):
2473         # Some platforms may have no disk stats API. Make sure the code can handle that
2474         # (test runs on all platforms).
2475         basedir = "storage/WebStatus/status_no_disk_stats"
2476         fileutil.make_dirs(basedir)
2477         ss = NoDiskStatsServer(basedir, "\x00" * 20)
2478         ss.setServiceParent(self.s)
2479         w = StorageStatus(ss)
2480         html = w.renderSynchronously()
2481         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2482         s = remove_tags(html)
2483         self.failUnlessIn("Accepting new shares: Yes", s)
2484         self.failUnlessIn("Total disk space: ?", s)
2485         self.failUnlessIn("Space Available to Tahoe: ?", s)
2486         self.failUnless(ss.get_available_space() is None)
2487
2488     def test_status_bad_disk_stats(self):
2489         # If the API to get disk stats exists but a call to it fails, then the status should
2490         # show that no shares will be accepted, and get_available_space() should be 0.
2491         basedir = "storage/WebStatus/status_bad_disk_stats"
2492         fileutil.make_dirs(basedir)
2493         ss = BadDiskStatsServer(basedir, "\x00" * 20)
2494         ss.setServiceParent(self.s)
2495         w = StorageStatus(ss)
2496         html = w.renderSynchronously()
2497         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2498         s = remove_tags(html)
2499         self.failUnlessIn("Accepting new shares: No", s)
2500         self.failUnlessIn("Total disk space: ?", s)
2501         self.failUnlessIn("Space Available to Tahoe: ?", s)
2502         self.failUnlessEqual(ss.get_available_space(), 0)
2503
2504     def test_readonly(self):
2505         basedir = "storage/WebStatus/readonly"
2506         fileutil.make_dirs(basedir)
2507         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2508         ss.setServiceParent(self.s)
2509         w = StorageStatus(ss)
2510         html = w.renderSynchronously()
2511         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2512         s = remove_tags(html)
2513         self.failUnlessIn("Accepting new shares: No", s)
2514
2515     def test_reserved(self):
2516         basedir = "storage/WebStatus/reserved"
2517         fileutil.make_dirs(basedir)
2518         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2519         ss.setServiceParent(self.s)
2520         w = StorageStatus(ss)
2521         html = w.renderSynchronously()
2522         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2523         s = remove_tags(html)
2524         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2525
2526     def test_huge_reserved(self):
2527         basedir = "storage/WebStatus/reserved"
2528         fileutil.make_dirs(basedir)
2529         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2530         ss.setServiceParent(self.s)
2531         w = StorageStatus(ss)
2532         html = w.renderSynchronously()
2533         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2534         s = remove_tags(html)
2535         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2536
2537     def test_util(self):
2538         w = StorageStatus(None)
2539         self.failUnlessEqual(w.render_space(None, None), "?")
2540         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2541         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2542         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2543         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2544         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
2545