]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
hush pyflakes-0.4.0 warnings: remove trivial unused variables. For #900.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re, simplejson, struct
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
26
27 class Marker:
28     pass
29 class FakeCanary:
30     def __init__(self, ignore_disconnectors=False):
31         self.ignore = ignore_disconnectors
32         self.disconnectors = {}
33     def notifyOnDisconnect(self, f, *args, **kwargs):
34         if self.ignore:
35             return
36         m = Marker()
37         self.disconnectors[m] = (f, args, kwargs)
38         return m
39     def dontNotifyOnDisconnect(self, marker):
40         if self.ignore:
41             return
42         del self.disconnectors[marker]
43
44 class FakeStatsProvider:
45     def count(self, name, delta=1):
46         pass
47     def register_producer(self, producer):
48         pass
49
50 class Bucket(unittest.TestCase):
51     def make_workdir(self, name):
52         basedir = os.path.join("storage", "Bucket", name)
53         incoming = os.path.join(basedir, "tmp", "bucket")
54         final = os.path.join(basedir, "bucket")
55         fileutil.make_dirs(basedir)
56         fileutil.make_dirs(os.path.join(basedir, "tmp"))
57         return incoming, final
58
59     def bucket_writer_closed(self, bw, consumed):
60         pass
61     def add_latency(self, category, latency):
62         pass
63     def count(self, name, delta=1):
64         pass
65
66     def make_lease(self):
67         owner_num = 0
68         renew_secret = os.urandom(32)
69         cancel_secret = os.urandom(32)
70         expiration_time = time.time() + 5000
71         return LeaseInfo(owner_num, renew_secret, cancel_secret,
72                          expiration_time, "\x00" * 20)
73
74     def test_create(self):
75         incoming, final = self.make_workdir("test_create")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*25)
81         bw.remote_write(75, "d"*7)
82         bw.remote_close()
83
84     def test_readwrite(self):
85         incoming, final = self.make_workdir("test_readwrite")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*7) # last block may be short
91         bw.remote_close()
92
93         # now read from it
94         br = BucketReader(self, bw.finalhome)
95         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
98
99 class RemoteBucket:
100
101     def callRemote(self, methname, *args, **kwargs):
102         def _call():
103             meth = getattr(self.target, "remote_" + methname)
104             return meth(*args, **kwargs)
105         return defer.maybeDeferred(_call)
106
107 class BucketProxy(unittest.TestCase):
108     def make_bucket(self, name, size):
109         basedir = os.path.join("storage", "BucketProxy", name)
110         incoming = os.path.join(basedir, "tmp", "bucket")
111         final = os.path.join(basedir, "bucket")
112         fileutil.make_dirs(basedir)
113         fileutil.make_dirs(os.path.join(basedir, "tmp"))
114         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
115                           FakeCanary())
116         rb = RemoteBucket()
117         rb.target = bw
118         return bw, rb, final
119
120     def make_lease(self):
121         owner_num = 0
122         renew_secret = os.urandom(32)
123         cancel_secret = os.urandom(32)
124         expiration_time = time.time() + 5000
125         return LeaseInfo(owner_num, renew_secret, cancel_secret,
126                          expiration_time, "\x00" * 20)
127
128     def bucket_writer_closed(self, bw, consumed):
129         pass
130     def add_latency(self, category, latency):
131         pass
132     def count(self, name, delta=1):
133         pass
134
135     def test_create(self):
136         bw, rb, sharefname = self.make_bucket("test_create", 500)
137         bp = WriteBucketProxy(rb,
138                               data_size=300,
139                               block_size=10,
140                               num_segments=5,
141                               num_share_hashes=3,
142                               uri_extension_size_max=500, nodeid=None)
143         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
144
145     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146         # Let's pretend each share has 100 bytes of data, and that there are
147         # 4 segments (25 bytes each), and 8 shares total. So the two
148         # per-segment merkle trees (crypttext_hash_tree,
149         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152         # long. That should make the whole share:
153         #
154         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
156
157         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
158
159         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
160                             for i in range(7)]
161         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
162                         for i in range(7)]
163         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
164                         for i in (1,9,13)]
165         uri_extension = "s" + "E"*498 + "e"
166
167         bw, rb, sharefname = self.make_bucket(name, sharesize)
168         bp = wbp_class(rb,
169                        data_size=95,
170                        block_size=25,
171                        num_segments=4,
172                        num_share_hashes=3,
173                        uri_extension_size_max=len(uri_extension),
174                        nodeid=None)
175
176         d = bp.put_header()
177         d.addCallback(lambda res: bp.put_block(0, "a"*25))
178         d.addCallback(lambda res: bp.put_block(1, "b"*25))
179         d.addCallback(lambda res: bp.put_block(2, "c"*25))
180         d.addCallback(lambda res: bp.put_block(3, "d"*20))
181         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185         d.addCallback(lambda res: bp.close())
186
187         # now read everything back
188         def _start_reading(res):
189             br = BucketReader(self, sharefname)
190             rb = RemoteBucket()
191             rb.target = br
192             rbp = rbp_class(rb, peerid="abc", storage_index="")
193             self.failUnless("to peer" in repr(rbp))
194             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
195
196             d1 = rbp.get_block_data(0, 25, 25)
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
204
205             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, crypttext_hashes))
208             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210             d1.addCallback(lambda res: rbp.get_share_hashes())
211             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212             d1.addCallback(lambda res: rbp.get_uri_extension())
213             d1.addCallback(lambda res:
214                            self.failUnlessEqual(res, uri_extension))
215
216             return d1
217
218         d.addCallback(_start_reading)
219
220         return d
221
222     def test_readwrite_v1(self):
223         return self._do_test_readwrite("test_readwrite_v1",
224                                        0x24, WriteBucketProxy, ReadBucketProxy)
225
226     def test_readwrite_v2(self):
227         return self._do_test_readwrite("test_readwrite_v2",
228                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
229
230 class FakeDiskStorageServer(StorageServer):
231     DISKAVAIL = 0
232     def get_disk_stats(self):
233         return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
234
235 class Server(unittest.TestCase):
236
237     def setUp(self):
238         self.sparent = LoggingServiceParent()
239         self.sparent.startService()
240         self._lease_secret = itertools.count()
241     def tearDown(self):
242         return self.sparent.stopService()
243
244     def workdir(self, name):
245         basedir = os.path.join("storage", "Server", name)
246         return basedir
247
248     def create(self, name, reserved_space=0, klass=StorageServer):
249         workdir = self.workdir(name)
250         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251                    stats_provider=FakeStatsProvider())
252         ss.setServiceParent(self.sparent)
253         return ss
254
255     def test_create(self):
256         self.create("test_create")
257
258     def allocate(self, ss, storage_index, sharenums, size, canary=None):
259         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
261         if not canary:
262             canary = FakeCanary()
263         return ss.remote_allocate_buckets(storage_index,
264                                           renew_secret, cancel_secret,
265                                           sharenums, size, canary)
266
267     def test_large_share(self):
268         ss = self.create("test_large_share")
269
270         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271         self.failUnlessEqual(already, set())
272         self.failUnlessEqual(set(writers.keys()), set([0]))
273
274         shnum, bucket = writers.items()[0]
275         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
276         bucket.remote_write(2**32, "ab")
277         bucket.remote_close()
278
279         readers = ss.remote_get_buckets("allocate")
280         reader = readers[shnum]
281         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
283
284     def test_dont_overfill_dirs(self):
285         """
286         This test asserts that if you add a second share whose storage index
287         share lots of leading bits with an extant share (but isn't the exact
288         same storage index), this won't add an entry to the share directory.
289         """
290         ss = self.create("test_dont_overfill_dirs")
291         already, writers = self.allocate(ss, "storageindex", [0], 10)
292         for i, wb in writers.items():
293             wb.remote_write(0, "%10d" % i)
294             wb.remote_close()
295         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296                                 "shares")
297         children_of_storedir = set(os.listdir(storedir))
298
299         # Now store another one under another storageindex that has leading
300         # chars the same as the first storageindex.
301         already, writers = self.allocate(ss, "storageindey", [0], 10)
302         for i, wb in writers.items():
303             wb.remote_write(0, "%10d" % i)
304             wb.remote_close()
305         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
306                                 "shares")
307         new_children_of_storedir = set(os.listdir(storedir))
308         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
309
310     def test_remove_incoming(self):
311         ss = self.create("test_remove_incoming")
312         already, writers = self.allocate(ss, "vid", range(3), 10)
313         for i,wb in writers.items():
314             wb.remote_write(0, "%10d" % i)
315             wb.remote_close()
316         incoming_share_dir = wb.incominghome
317         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319         incoming_dir = os.path.dirname(incoming_prefix_dir)
320         self.failIf(os.path.exists(incoming_bucket_dir))
321         self.failIf(os.path.exists(incoming_prefix_dir))
322         self.failUnless(os.path.exists(incoming_dir))
323
324     def test_allocate(self):
325         ss = self.create("test_allocate")
326
327         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
328
329         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330         self.failUnlessEqual(already, set())
331         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
332
333         # while the buckets are open, they should not count as readable
334         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
335
336         # close the buckets
337         for i,wb in writers.items():
338             wb.remote_write(0, "%25d" % i)
339             wb.remote_close()
340             # aborting a bucket that was already closed is a no-op
341             wb.remote_abort()
342
343         # now they should be readable
344         b = ss.remote_get_buckets("allocate")
345         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
347         b_str = str(b[0])
348         self.failUnless("BucketReader" in b_str, b_str)
349         self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
350
351         # now if we ask about writing again, the server should offer those
352         # three buckets as already present. It should offer them even if we
353         # don't ask about those specific ones.
354         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355         self.failUnlessEqual(already, set([0,1,2]))
356         self.failUnlessEqual(set(writers.keys()), set([3,4]))
357
358         # while those two buckets are open for writing, the server should
359         # refuse to offer them to uploaders
360
361         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362         self.failUnlessEqual(already2, set([0,1,2]))
363         self.failUnlessEqual(set(writers2.keys()), set([5]))
364
365         # aborting the writes should remove the tempfiles
366         for i,wb in writers2.items():
367             wb.remote_abort()
368         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369         self.failUnlessEqual(already2, set([0,1,2]))
370         self.failUnlessEqual(set(writers2.keys()), set([5]))
371
372         for i,wb in writers2.items():
373             wb.remote_abort()
374         for i,wb in writers.items():
375             wb.remote_abort()
376
377     def test_bad_container_version(self):
378         ss = self.create("test_bad_container_version")
379         a,w = self.allocate(ss, "si1", [0], 10)
380         w[0].remote_write(0, "\xff"*10)
381         w[0].remote_close()
382
383         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
384         f = open(fn, "rb+")
385         f.seek(0)
386         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
387         f.close()
388
389         ss.remote_get_buckets("allocate")
390
391         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392                                   ss.remote_get_buckets, "si1")
393         self.failUnless(" had version 0 but we wanted 1" in str(e), e)
394
395     def test_disconnect(self):
396         # simulate a disconnection
397         ss = self.create("test_disconnect")
398         canary = FakeCanary()
399         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400         self.failUnlessEqual(already, set())
401         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402         for (f,args,kwargs) in canary.disconnectors.values():
403             f(*args, **kwargs)
404         del already
405         del writers
406
407         # that ought to delete the incoming shares
408         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409         self.failUnlessEqual(already, set())
410         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
411
412     def test_reserved_space(self):
413         ss = self.create("test_reserved_space", reserved_space=10000,
414                          klass=FakeDiskStorageServer)
415         # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
416         ss.DISKAVAIL = 15000
417         # 15k available, 10k reserved, leaves 5k for shares
418
419         # a newly created and filled share incurs this much overhead, beyond
420         # the size we request.
421         OVERHEAD = 3*4
422         LEASE_SIZE = 4+32+32+4
423         canary = FakeCanary(True)
424         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425         self.failUnlessEqual(len(writers), 3)
426         # now the StorageServer should have 3000 bytes provisionally
427         # allocated, allowing only 2000 more to be claimed
428         self.failUnlessEqual(len(ss._active_writers), 3)
429
430         # allocating 1001-byte shares only leaves room for one
431         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432         self.failUnlessEqual(len(writers2), 1)
433         self.failUnlessEqual(len(ss._active_writers), 4)
434
435         # we abandon the first set, so their provisional allocation should be
436         # returned
437         del already
438         del writers
439         self.failUnlessEqual(len(ss._active_writers), 1)
440         # now we have a provisional allocation of 1001 bytes
441
442         # and we close the second set, so their provisional allocation should
443         # become real, long-term allocation, and grows to include the
444         # overhead.
445         for bw in writers2.values():
446             bw.remote_write(0, "a"*25)
447             bw.remote_close()
448         del already2
449         del writers2
450         del bw
451         self.failUnlessEqual(len(ss._active_writers), 0)
452
453         allocated = 1001 + OVERHEAD + LEASE_SIZE
454
455         # we have to manually increase DISKAVAIL, since we're not doing real
456         # disk measurements
457         ss.DISKAVAIL -= allocated
458
459         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462         self.failUnlessEqual(len(writers3), 39)
463         self.failUnlessEqual(len(ss._active_writers), 39)
464
465         del already3
466         del writers3
467         self.failUnlessEqual(len(ss._active_writers), 0)
468         ss.disownServiceParent()
469         del ss
470
471     def test_disk_stats(self):
472         # This will spuriously fail if there is zero disk space left (but so will other tests).
473         ss = self.create("test_disk_stats", reserved_space=0)
474
475         disk = ss.get_disk_stats()
476         self.failUnless(disk['total'] > 0, disk['total'])
477         self.failUnless(disk['used'] > 0, disk['used'])
478         self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
479         self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
480         self.failUnless(disk['avail'] > 0, disk['avail'])
481
482     def test_disk_stats_avail_nonnegative(self):
483         ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
484
485         disk = ss.get_disk_stats()
486         self.failUnlessEqual(disk['avail'], 0)
487
488     def test_seek(self):
489         basedir = self.workdir("test_seek_behavior")
490         fileutil.make_dirs(basedir)
491         filename = os.path.join(basedir, "testfile")
492         f = open(filename, "wb")
493         f.write("start")
494         f.close()
495         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
496         # files. mode="a" preserves previous contents but does not allow
497         # seeking-to-create-holes. mode="r+" allows both.
498         f = open(filename, "rb+")
499         f.seek(100)
500         f.write("100")
501         f.close()
502         filelen = os.stat(filename)[stat.ST_SIZE]
503         self.failUnlessEqual(filelen, 100+3)
504         f2 = open(filename, "rb")
505         self.failUnlessEqual(f2.read(5), "start")
506
507
508     def test_leases(self):
509         ss = self.create("test_leases")
510         canary = FakeCanary()
511         sharenums = range(5)
512         size = 100
513
514         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
515                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
516         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
517                                                      sharenums, size, canary)
518         self.failUnlessEqual(len(already), 0)
519         self.failUnlessEqual(len(writers), 5)
520         for wb in writers.values():
521             wb.remote_close()
522
523         leases = list(ss.get_leases("si0"))
524         self.failUnlessEqual(len(leases), 1)
525         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
526
527         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
528                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
529         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
530                                                      sharenums, size, canary)
531         for wb in writers.values():
532             wb.remote_close()
533
534         # take out a second lease on si1
535         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
538                                                      sharenums, size, canary)
539         self.failUnlessEqual(len(already), 5)
540         self.failUnlessEqual(len(writers), 0)
541
542         leases = list(ss.get_leases("si1"))
543         self.failUnlessEqual(len(leases), 2)
544         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
545
546         # and a third lease, using add-lease
547         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
548                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
549         ss.remote_add_lease("si1", rs2a, cs2a)
550         leases = list(ss.get_leases("si1"))
551         self.failUnlessEqual(len(leases), 3)
552         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
553
554         # add-lease on a missing storage index is silently ignored
555         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
556
557         # check that si0 is readable
558         readers = ss.remote_get_buckets("si0")
559         self.failUnlessEqual(len(readers), 5)
560
561         # renew the first lease. Only the proper renew_secret should work
562         ss.remote_renew_lease("si0", rs0)
563         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
564         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
565
566         # check that si0 is still readable
567         readers = ss.remote_get_buckets("si0")
568         self.failUnlessEqual(len(readers), 5)
569
570         # now cancel it
571         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
572         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
573         ss.remote_cancel_lease("si0", cs0)
574
575         # si0 should now be gone
576         readers = ss.remote_get_buckets("si0")
577         self.failUnlessEqual(len(readers), 0)
578         # and the renew should no longer work
579         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
580
581
582         # cancel the first lease on si1, leaving the second and third in place
583         ss.remote_cancel_lease("si1", cs1)
584         readers = ss.remote_get_buckets("si1")
585         self.failUnlessEqual(len(readers), 5)
586         # the corresponding renew should no longer work
587         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
588
589         leases = list(ss.get_leases("si1"))
590         self.failUnlessEqual(len(leases), 2)
591         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
592
593         ss.remote_renew_lease("si1", rs2)
594         # cancelling the second and third should make it go away
595         ss.remote_cancel_lease("si1", cs2)
596         ss.remote_cancel_lease("si1", cs2a)
597         readers = ss.remote_get_buckets("si1")
598         self.failUnlessEqual(len(readers), 0)
599         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
600         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
601         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
602
603         leases = list(ss.get_leases("si1"))
604         self.failUnlessEqual(len(leases), 0)
605
606
607         # test overlapping uploads
608         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
611                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
612         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
613                                                      sharenums, size, canary)
614         self.failUnlessEqual(len(already), 0)
615         self.failUnlessEqual(len(writers), 5)
616         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
617                                                        sharenums, size, canary)
618         self.failUnlessEqual(len(already2), 0)
619         self.failUnlessEqual(len(writers2), 0)
620         for wb in writers.values():
621             wb.remote_close()
622
623         leases = list(ss.get_leases("si3"))
624         self.failUnlessEqual(len(leases), 1)
625
626         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
627                                                        sharenums, size, canary)
628         self.failUnlessEqual(len(already3), 5)
629         self.failUnlessEqual(len(writers3), 0)
630
631         leases = list(ss.get_leases("si3"))
632         self.failUnlessEqual(len(leases), 2)
633
634     def test_readonly(self):
635         workdir = self.workdir("test_readonly")
636         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
637         ss.setServiceParent(self.sparent)
638
639         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
640         self.failUnlessEqual(already, set())
641         self.failUnlessEqual(writers, {})
642
643         stats = ss.get_stats()
644         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
645         if "storage_server.disk_avail" in stats:
646             # Some platforms may not have an API to get disk stats.
647             # But if there are stats, readonly_storage means disk_avail=0
648             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
649
650     def test_discard(self):
651         # discard is really only used for other tests, but we test it anyways
652         workdir = self.workdir("test_discard")
653         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
654         ss.setServiceParent(self.sparent)
655
656         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
657         self.failUnlessEqual(already, set())
658         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
659         for i,wb in writers.items():
660             wb.remote_write(0, "%25d" % i)
661             wb.remote_close()
662         # since we discard the data, the shares should be present but sparse.
663         # Since we write with some seeks, the data we read back will be all
664         # zeros.
665         b = ss.remote_get_buckets("vid")
666         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
667         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
668
669     def test_advise_corruption(self):
670         workdir = self.workdir("test_advise_corruption")
671         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
672         ss.setServiceParent(self.sparent)
673
674         si0_s = base32.b2a("si0")
675         ss.remote_advise_corrupt_share("immutable", "si0", 0,
676                                        "This share smells funny.\n")
677         reportdir = os.path.join(workdir, "corruption-advisories")
678         reports = os.listdir(reportdir)
679         self.failUnlessEqual(len(reports), 1)
680         report_si0 = reports[0]
681         self.failUnless(si0_s in report_si0, report_si0)
682         f = open(os.path.join(reportdir, report_si0), "r")
683         report = f.read()
684         f.close()
685         self.failUnless("type: immutable" in report)
686         self.failUnless(("storage_index: %s" % si0_s) in report)
687         self.failUnless("share_number: 0" in report)
688         self.failUnless("This share smells funny." in report)
689
690         # test the RIBucketWriter version too
691         si1_s = base32.b2a("si1")
692         already,writers = self.allocate(ss, "si1", [1], 75)
693         self.failUnlessEqual(already, set())
694         self.failUnlessEqual(set(writers.keys()), set([1]))
695         writers[1].remote_write(0, "data")
696         writers[1].remote_close()
697
698         b = ss.remote_get_buckets("si1")
699         self.failUnlessEqual(set(b.keys()), set([1]))
700         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
701
702         reports = os.listdir(reportdir)
703         self.failUnlessEqual(len(reports), 2)
704         report_si1 = [r for r in reports if si1_s in r][0]
705         f = open(os.path.join(reportdir, report_si1), "r")
706         report = f.read()
707         f.close()
708         self.failUnless("type: immutable" in report)
709         self.failUnless(("storage_index: %s" % si1_s) in report)
710         self.failUnless("share_number: 1" in report)
711         self.failUnless("This share tastes like dust." in report)
712
713
714
715 class MutableServer(unittest.TestCase):
716
717     def setUp(self):
718         self.sparent = LoggingServiceParent()
719         self._lease_secret = itertools.count()
720     def tearDown(self):
721         return self.sparent.stopService()
722
723     def workdir(self, name):
724         basedir = os.path.join("storage", "MutableServer", name)
725         return basedir
726
727     def create(self, name):
728         workdir = self.workdir(name)
729         ss = StorageServer(workdir, "\x00" * 20)
730         ss.setServiceParent(self.sparent)
731         return ss
732
733     def test_create(self):
734         self.create("test_create")
735
736     def write_enabler(self, we_tag):
737         return hashutil.tagged_hash("we_blah", we_tag)
738
739     def renew_secret(self, tag):
740         return hashutil.tagged_hash("renew_blah", str(tag))
741
742     def cancel_secret(self, tag):
743         return hashutil.tagged_hash("cancel_blah", str(tag))
744
745     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
746         write_enabler = self.write_enabler(we_tag)
747         renew_secret = self.renew_secret(lease_tag)
748         cancel_secret = self.cancel_secret(lease_tag)
749         rstaraw = ss.remote_slot_testv_and_readv_and_writev
750         testandwritev = dict( [ (shnum, ([], [], None) )
751                          for shnum in sharenums ] )
752         readv = []
753         rc = rstaraw(storage_index,
754                      (write_enabler, renew_secret, cancel_secret),
755                      testandwritev,
756                      readv)
757         (did_write, readv_data) = rc
758         self.failUnless(did_write)
759         self.failUnless(isinstance(readv_data, dict))
760         self.failUnlessEqual(len(readv_data), 0)
761
762     def test_bad_magic(self):
763         ss = self.create("test_bad_magic")
764         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
765         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
766         f = open(fn, "rb+")
767         f.seek(0)
768         f.write("BAD MAGIC")
769         f.close()
770         read = ss.remote_slot_readv
771         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
772                                   read, "si1", [0], [(0,10)])
773         self.failUnless(" had magic " in str(e), e)
774         self.failUnless(" but we wanted " in str(e), e)
775
776     def test_container_size(self):
777         ss = self.create("test_container_size")
778         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
779                       set([0,1,2]), 100)
780         read = ss.remote_slot_readv
781         rstaraw = ss.remote_slot_testv_and_readv_and_writev
782         secrets = ( self.write_enabler("we1"),
783                     self.renew_secret("we1"),
784                     self.cancel_secret("we1") )
785         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
786         answer = rstaraw("si1", secrets,
787                          {0: ([], [(0,data)], len(data)+12)},
788                          [])
789         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
790
791         # trying to make the container too large will raise an exception
792         TOOBIG = MutableShareFile.MAX_SIZE + 10
793         self.failUnlessRaises(DataTooLargeError,
794                               rstaraw, "si1", secrets,
795                               {0: ([], [(0,data)], TOOBIG)},
796                               [])
797
798         # it should be possible to make the container smaller, although at
799         # the moment this doesn't actually affect the share, unless the
800         # container size is dropped to zero, in which case the share is
801         # deleted.
802         answer = rstaraw("si1", secrets,
803                          {0: ([], [(0,data)], len(data)+8)},
804                          [])
805         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
806
807         answer = rstaraw("si1", secrets,
808                          {0: ([], [(0,data)], 0)},
809                          [])
810         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
811
812         read_answer = read("si1", [0], [(0,10)])
813         self.failUnlessEqual(read_answer, {})
814
815     def test_allocate(self):
816         ss = self.create("test_allocate")
817         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
818                       set([0,1,2]), 100)
819
820         read = ss.remote_slot_readv
821         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
822                              {0: [""]})
823         self.failUnlessEqual(read("si1", [], [(0, 10)]),
824                              {0: [""], 1: [""], 2: [""]})
825         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
826                              {0: [""]})
827
828         # try writing to one
829         secrets = ( self.write_enabler("we1"),
830                     self.renew_secret("we1"),
831                     self.cancel_secret("we1") )
832         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
833         write = ss.remote_slot_testv_and_readv_and_writev
834         answer = write("si1", secrets,
835                        {0: ([], [(0,data)], None)},
836                        [])
837         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
838
839         self.failUnlessEqual(read("si1", [0], [(0,20)]),
840                              {0: ["00000000001111111111"]})
841         self.failUnlessEqual(read("si1", [0], [(95,10)]),
842                              {0: ["99999"]})
843         #self.failUnlessEqual(s0.remote_get_length(), 100)
844
845         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
846         f = self.failUnlessRaises(BadWriteEnablerError,
847                                   write, "si1", bad_secrets,
848                                   {}, [])
849         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
850
851         # this testv should fail
852         answer = write("si1", secrets,
853                        {0: ([(0, 12, "eq", "444444444444"),
854                              (20, 5, "eq", "22222"),
855                              ],
856                             [(0, "x"*100)],
857                             None),
858                         },
859                        [(0,12), (20,5)],
860                        )
861         self.failUnlessEqual(answer, (False,
862                                       {0: ["000000000011", "22222"],
863                                        1: ["", ""],
864                                        2: ["", ""],
865                                        }))
866         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
867
868         # as should this one
869         answer = write("si1", secrets,
870                        {0: ([(10, 5, "lt", "11111"),
871                              ],
872                             [(0, "x"*100)],
873                             None),
874                         },
875                        [(10,5)],
876                        )
877         self.failUnlessEqual(answer, (False,
878                                       {0: ["11111"],
879                                        1: [""],
880                                        2: [""]},
881                                       ))
882         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
883
884
885     def test_operators(self):
886         # test operators, the data we're comparing is '11111' in all cases.
887         # test both fail+pass, reset data after each one.
888         ss = self.create("test_operators")
889
890         secrets = ( self.write_enabler("we1"),
891                     self.renew_secret("we1"),
892                     self.cancel_secret("we1") )
893         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
894         write = ss.remote_slot_testv_and_readv_and_writev
895         read = ss.remote_slot_readv
896
897         def reset():
898             write("si1", secrets,
899                   {0: ([], [(0,data)], None)},
900                   [])
901
902         reset()
903
904         #  lt
905         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
906                                              ],
907                                             [(0, "x"*100)],
908                                             None,
909                                             )}, [(10,5)])
910         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
911         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
912         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
913         reset()
914
915         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
916                                              ],
917                                             [(0, "x"*100)],
918                                             None,
919                                             )}, [(10,5)])
920         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
921         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
922         reset()
923
924         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
925                                              ],
926                                             [(0, "y"*100)],
927                                             None,
928                                             )}, [(10,5)])
929         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
930         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
931         reset()
932
933         #  le
934         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
935                                              ],
936                                             [(0, "x"*100)],
937                                             None,
938                                             )}, [(10,5)])
939         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
940         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
941         reset()
942
943         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
944                                              ],
945                                             [(0, "y"*100)],
946                                             None,
947                                             )}, [(10,5)])
948         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
949         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
950         reset()
951
952         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
953                                              ],
954                                             [(0, "y"*100)],
955                                             None,
956                                             )}, [(10,5)])
957         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
959         reset()
960
961         #  eq
962         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
963                                              ],
964                                             [(0, "x"*100)],
965                                             None,
966                                             )}, [(10,5)])
967         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
968         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
969         reset()
970
971         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
972                                              ],
973                                             [(0, "y"*100)],
974                                             None,
975                                             )}, [(10,5)])
976         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
977         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
978         reset()
979
980         #  ne
981         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
982                                              ],
983                                             [(0, "x"*100)],
984                                             None,
985                                             )}, [(10,5)])
986         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
987         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
988         reset()
989
990         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
991                                              ],
992                                             [(0, "y"*100)],
993                                             None,
994                                             )}, [(10,5)])
995         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
996         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
997         reset()
998
999         #  ge
1000         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1001                                              ],
1002                                             [(0, "y"*100)],
1003                                             None,
1004                                             )}, [(10,5)])
1005         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1006         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1007         reset()
1008
1009         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1010                                              ],
1011                                             [(0, "y"*100)],
1012                                             None,
1013                                             )}, [(10,5)])
1014         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1016         reset()
1017
1018         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1019                                              ],
1020                                             [(0, "y"*100)],
1021                                             None,
1022                                             )}, [(10,5)])
1023         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1024         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1025         reset()
1026
1027         #  gt
1028         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1029                                              ],
1030                                             [(0, "y"*100)],
1031                                             None,
1032                                             )}, [(10,5)])
1033         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1034         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1035         reset()
1036
1037         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1038                                              ],
1039                                             [(0, "x"*100)],
1040                                             None,
1041                                             )}, [(10,5)])
1042         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1043         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1044         reset()
1045
1046         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1047                                              ],
1048                                             [(0, "x"*100)],
1049                                             None,
1050                                             )}, [(10,5)])
1051         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1053         reset()
1054
1055         # finally, test some operators against empty shares
1056         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1057                                              ],
1058                                             [(0, "x"*100)],
1059                                             None,
1060                                             )}, [(10,5)])
1061         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1062         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1063         reset()
1064
1065     def test_readv(self):
1066         ss = self.create("test_readv")
1067         secrets = ( self.write_enabler("we1"),
1068                     self.renew_secret("we1"),
1069                     self.cancel_secret("we1") )
1070         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1071         write = ss.remote_slot_testv_and_readv_and_writev
1072         read = ss.remote_slot_readv
1073         data = [("%d" % i) * 100 for i in range(3)]
1074         rc = write("si1", secrets,
1075                    {0: ([], [(0,data[0])], None),
1076                     1: ([], [(0,data[1])], None),
1077                     2: ([], [(0,data[2])], None),
1078                     }, [])
1079         self.failUnlessEqual(rc, (True, {}))
1080
1081         answer = read("si1", [], [(0, 10)])
1082         self.failUnlessEqual(answer, {0: ["0"*10],
1083                                       1: ["1"*10],
1084                                       2: ["2"*10]})
1085
1086     def compare_leases_without_timestamps(self, leases_a, leases_b):
1087         self.failUnlessEqual(len(leases_a), len(leases_b))
1088         for i in range(len(leases_a)):
1089             a = leases_a[i]
1090             b = leases_b[i]
1091             self.failUnlessEqual(a.owner_num,       b.owner_num)
1092             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1093             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1094             self.failUnlessEqual(a.nodeid,          b.nodeid)
1095
1096     def compare_leases(self, leases_a, leases_b):
1097         self.failUnlessEqual(len(leases_a), len(leases_b))
1098         for i in range(len(leases_a)):
1099             a = leases_a[i]
1100             b = leases_b[i]
1101             self.failUnlessEqual(a.owner_num,       b.owner_num)
1102             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1103             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1104             self.failUnlessEqual(a.nodeid,          b.nodeid)
1105             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1106
1107     def test_leases(self):
1108         ss = self.create("test_leases")
1109         def secrets(n):
1110             return ( self.write_enabler("we1"),
1111                      self.renew_secret("we1-%d" % n),
1112                      self.cancel_secret("we1-%d" % n) )
1113         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1114         write = ss.remote_slot_testv_and_readv_and_writev
1115         read = ss.remote_slot_readv
1116         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1117         self.failUnlessEqual(rc, (True, {}))
1118
1119         # create a random non-numeric file in the bucket directory, to
1120         # exercise the code that's supposed to ignore those.
1121         bucket_dir = os.path.join(self.workdir("test_leases"),
1122                                   "shares", storage_index_to_dir("si1"))
1123         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1124         f.write("you ought to be ignoring me\n")
1125         f.close()
1126
1127         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1128         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1129
1130         # add-lease on a missing storage index is silently ignored
1131         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1132
1133         # re-allocate the slots and use the same secrets, that should update
1134         # the lease
1135         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1136         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1137
1138         # renew it directly
1139         ss.remote_renew_lease("si1", secrets(0)[1])
1140         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1141
1142         # now allocate them with a bunch of different secrets, to trigger the
1143         # extended lease code. Use add_lease for one of them.
1144         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1145         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1146         secrets2 = secrets(2)
1147         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1148         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1149         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1150         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1151         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1152
1153         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1154
1155         # cancel one of them
1156         ss.remote_cancel_lease("si1", secrets(5)[2])
1157         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1158
1159         all_leases = list(s0.get_leases())
1160         # and write enough data to expand the container, forcing the server
1161         # to move the leases
1162         write("si1", secrets(0),
1163               {0: ([], [(0,data)], 200), },
1164               [])
1165
1166         # read back the leases, make sure they're still intact.
1167         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1168
1169         ss.remote_renew_lease("si1", secrets(0)[1])
1170         ss.remote_renew_lease("si1", secrets(1)[1])
1171         ss.remote_renew_lease("si1", secrets(2)[1])
1172         ss.remote_renew_lease("si1", secrets(3)[1])
1173         ss.remote_renew_lease("si1", secrets(4)[1])
1174         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1175         # get a new copy of the leases, with the current timestamps. Reading
1176         # data and failing to renew/cancel leases should leave the timestamps
1177         # alone.
1178         all_leases = list(s0.get_leases())
1179         # renewing with a bogus token should prompt an error message
1180
1181         # examine the exception thus raised, make sure the old nodeid is
1182         # present, to provide for share migration
1183         e = self.failUnlessRaises(IndexError,
1184                                   ss.remote_renew_lease, "si1",
1185                                   secrets(20)[1])
1186         e_s = str(e)
1187         self.failUnless("Unable to renew non-existent lease" in e_s)
1188         self.failUnless("I have leases accepted by nodeids:" in e_s)
1189         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1190
1191         # same for cancelling
1192         self.failUnlessRaises(IndexError,
1193                               ss.remote_cancel_lease, "si1",
1194                               secrets(20)[2])
1195         self.compare_leases(all_leases, list(s0.get_leases()))
1196
1197         # reading shares should not modify the timestamp
1198         read("si1", [], [(0,200)])
1199         self.compare_leases(all_leases, list(s0.get_leases()))
1200
1201         write("si1", secrets(0),
1202               {0: ([], [(200, "make me bigger")], None)}, [])
1203         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1204
1205         write("si1", secrets(0),
1206               {0: ([], [(500, "make me really bigger")], None)}, [])
1207         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1208
1209         # now cancel them all
1210         ss.remote_cancel_lease("si1", secrets(0)[2])
1211         ss.remote_cancel_lease("si1", secrets(1)[2])
1212         ss.remote_cancel_lease("si1", secrets(2)[2])
1213         ss.remote_cancel_lease("si1", secrets(3)[2])
1214
1215         # the slot should still be there
1216         remaining_shares = read("si1", [], [(0,10)])
1217         self.failUnlessEqual(len(remaining_shares), 1)
1218         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1219
1220         # cancelling a non-existent lease should raise an IndexError
1221         self.failUnlessRaises(IndexError,
1222                               ss.remote_cancel_lease, "si1", "nonsecret")
1223
1224         # and the slot should still be there
1225         remaining_shares = read("si1", [], [(0,10)])
1226         self.failUnlessEqual(len(remaining_shares), 1)
1227         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1228
1229         ss.remote_cancel_lease("si1", secrets(4)[2])
1230         # now the slot should be gone
1231         no_shares = read("si1", [], [(0,10)])
1232         self.failUnlessEqual(no_shares, {})
1233
1234         # cancelling a lease on a non-existent share should raise an IndexError
1235         self.failUnlessRaises(IndexError,
1236                               ss.remote_cancel_lease, "si2", "nonsecret")
1237
1238     def test_remove(self):
1239         ss = self.create("test_remove")
1240         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1241                       set([0,1,2]), 100)
1242         readv = ss.remote_slot_readv
1243         writev = ss.remote_slot_testv_and_readv_and_writev
1244         secrets = ( self.write_enabler("we1"),
1245                     self.renew_secret("we1"),
1246                     self.cancel_secret("we1") )
1247         # delete sh0 by setting its size to zero
1248         answer = writev("si1", secrets,
1249                         {0: ([], [], 0)},
1250                         [])
1251         # the answer should mention all the shares that existed before the
1252         # write
1253         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1254         # but a new read should show only sh1 and sh2
1255         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1256                              {1: [""], 2: [""]})
1257
1258         # delete sh1 by setting its size to zero
1259         answer = writev("si1", secrets,
1260                         {1: ([], [], 0)},
1261                         [])
1262         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1263         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1264                              {2: [""]})
1265
1266         # delete sh2 by setting its size to zero
1267         answer = writev("si1", secrets,
1268                         {2: ([], [], 0)},
1269                         [])
1270         self.failUnlessEqual(answer, (True, {2:[]}) )
1271         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1272                              {})
1273         # and the bucket directory should now be gone
1274         si = base32.b2a("si1")
1275         # note: this is a detail of the storage server implementation, and
1276         # may change in the future
1277         prefix = si[:2]
1278         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1279         bucketdir = os.path.join(prefixdir, si)
1280         self.failUnless(os.path.exists(prefixdir))
1281         self.failIf(os.path.exists(bucketdir))
1282
1283 class Stats(unittest.TestCase):
1284
1285     def setUp(self):
1286         self.sparent = LoggingServiceParent()
1287         self._lease_secret = itertools.count()
1288     def tearDown(self):
1289         return self.sparent.stopService()
1290
1291     def workdir(self, name):
1292         basedir = os.path.join("storage", "Server", name)
1293         return basedir
1294
1295     def create(self, name):
1296         workdir = self.workdir(name)
1297         ss = StorageServer(workdir, "\x00" * 20)
1298         ss.setServiceParent(self.sparent)
1299         return ss
1300
1301     def test_latencies(self):
1302         ss = self.create("test_latencies")
1303         for i in range(10000):
1304             ss.add_latency("allocate", 1.0 * i)
1305         for i in range(1000):
1306             ss.add_latency("renew", 1.0 * i)
1307         for i in range(10):
1308             ss.add_latency("cancel", 2.0 * i)
1309         ss.add_latency("get", 5.0)
1310
1311         output = ss.get_latencies()
1312
1313         self.failUnlessEqual(sorted(output.keys()),
1314                              sorted(["allocate", "renew", "cancel", "get"]))
1315         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1316         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1317         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1318         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1319         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1320         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1321         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1322         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1323         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1324
1325         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1326         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1327         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1328         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1329         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1330         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1331         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1332         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1333         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1334
1335         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1336         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1337         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1338         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1339         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1340         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1341         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1342         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1343         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1344
1345         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1346         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1347         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1348         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1349         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1350         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1351         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1352         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1353         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1354
1355 def remove_tags(s):
1356     s = re.sub(r'<[^>]*>', ' ', s)
1357     s = re.sub(r'\s+', ' ', s)
1358     return s
1359
1360 class MyBucketCountingCrawler(BucketCountingCrawler):
1361     def finished_prefix(self, cycle, prefix):
1362         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1363         if self.hook_ds:
1364             d = self.hook_ds.pop(0)
1365             d.callback(None)
1366
1367 class MyStorageServer(StorageServer):
1368     def add_bucket_counter(self):
1369         statefile = os.path.join(self.storedir, "bucket_counter.state")
1370         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1371         self.bucket_counter.setServiceParent(self)
1372
1373 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1374
1375     def setUp(self):
1376         self.s = service.MultiService()
1377         self.s.startService()
1378     def tearDown(self):
1379         return self.s.stopService()
1380
1381     def test_bucket_counter(self):
1382         basedir = "storage/BucketCounter/bucket_counter"
1383         fileutil.make_dirs(basedir)
1384         ss = StorageServer(basedir, "\x00" * 20)
1385         # to make sure we capture the bucket-counting-crawler in the middle
1386         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1387         # also make it start sooner than usual.
1388         ss.bucket_counter.slow_start = 0
1389         orig_cpu_slice = ss.bucket_counter.cpu_slice
1390         ss.bucket_counter.cpu_slice = 0
1391         ss.setServiceParent(self.s)
1392
1393         w = StorageStatus(ss)
1394
1395         # this sample is before the crawler has started doing anything
1396         html = w.renderSynchronously()
1397         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1398         s = remove_tags(html)
1399         self.failUnless("Accepting new shares: Yes" in s, s)
1400         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1401         self.failUnless("Total buckets: Not computed yet" in s, s)
1402         self.failUnless("Next crawl in" in s, s)
1403
1404         # give the bucket-counting-crawler one tick to get started. The
1405         # cpu_slice=0 will force it to yield right after it processes the
1406         # first prefix
1407
1408         d = fireEventually()
1409         def _check(ignored):
1410             # are we really right after the first prefix?
1411             state = ss.bucket_counter.get_state()
1412             self.failUnlessEqual(state["last-complete-prefix"],
1413                                  ss.bucket_counter.prefixes[0])
1414             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1415             html = w.renderSynchronously()
1416             s = remove_tags(html)
1417             self.failUnless(" Current crawl " in s, s)
1418             self.failUnless(" (next work in " in s, s)
1419         d.addCallback(_check)
1420
1421         # now give it enough time to complete a full cycle
1422         def _watch():
1423             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1424         d.addCallback(lambda ignored: self.poll(_watch))
1425         def _check2(ignored):
1426             ss.bucket_counter.cpu_slice = orig_cpu_slice
1427             html = w.renderSynchronously()
1428             s = remove_tags(html)
1429             self.failUnless("Total buckets: 0 (the number of" in s, s)
1430             self.failUnless("Next crawl in 59 minutes" in s, s)
1431         d.addCallback(_check2)
1432         return d
1433
1434     def test_bucket_counter_cleanup(self):
1435         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1436         fileutil.make_dirs(basedir)
1437         ss = StorageServer(basedir, "\x00" * 20)
1438         # to make sure we capture the bucket-counting-crawler in the middle
1439         # of a cycle, we reach in and reduce its maximum slice time to 0.
1440         ss.bucket_counter.slow_start = 0
1441         orig_cpu_slice = ss.bucket_counter.cpu_slice
1442         ss.bucket_counter.cpu_slice = 0
1443         ss.setServiceParent(self.s)
1444
1445         d = fireEventually()
1446
1447         def _after_first_prefix(ignored):
1448             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1449             # now sneak in and mess with its state, to make sure it cleans up
1450             # properly at the end of the cycle
1451             state = ss.bucket_counter.state
1452             self.failUnlessEqual(state["last-complete-prefix"],
1453                                  ss.bucket_counter.prefixes[0])
1454             state["bucket-counts"][-12] = {}
1455             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1456             ss.bucket_counter.save_state()
1457         d.addCallback(_after_first_prefix)
1458
1459         # now give it enough time to complete a cycle
1460         def _watch():
1461             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1462         d.addCallback(lambda ignored: self.poll(_watch))
1463         def _check2(ignored):
1464             ss.bucket_counter.cpu_slice = orig_cpu_slice
1465             s = ss.bucket_counter.get_state()
1466             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1467             self.failIf("bogusprefix!" in s["storage-index-samples"],
1468                         s["storage-index-samples"].keys())
1469         d.addCallback(_check2)
1470         return d
1471
1472     def test_bucket_counter_eta(self):
1473         basedir = "storage/BucketCounter/bucket_counter_eta"
1474         fileutil.make_dirs(basedir)
1475         ss = MyStorageServer(basedir, "\x00" * 20)
1476         ss.bucket_counter.slow_start = 0
1477         # these will be fired inside finished_prefix()
1478         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1479         w = StorageStatus(ss)
1480
1481         d = defer.Deferred()
1482
1483         def _check_1(ignored):
1484             # no ETA is available yet
1485             html = w.renderSynchronously()
1486             s = remove_tags(html)
1487             self.failUnlessIn("complete (next work", s)
1488
1489         def _check_2(ignored):
1490             # one prefix has finished, so an ETA based upon that elapsed time
1491             # should be available.
1492             html = w.renderSynchronously()
1493             s = remove_tags(html)
1494             self.failUnlessIn("complete (ETA ", s)
1495
1496         def _check_3(ignored):
1497             # two prefixes have finished
1498             html = w.renderSynchronously()
1499             s = remove_tags(html)
1500             self.failUnlessIn("complete (ETA ", s)
1501             d.callback("done")
1502
1503         hooks[0].addCallback(_check_1).addErrback(d.errback)
1504         hooks[1].addCallback(_check_2).addErrback(d.errback)
1505         hooks[2].addCallback(_check_3).addErrback(d.errback)
1506
1507         ss.setServiceParent(self.s)
1508         return d
1509
1510 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1511     stop_after_first_bucket = False
1512     def process_bucket(self, *args, **kwargs):
1513         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1514         if self.stop_after_first_bucket:
1515             self.stop_after_first_bucket = False
1516             self.cpu_slice = -1.0
1517     def yielding(self, sleep_time):
1518         if not self.stop_after_first_bucket:
1519             self.cpu_slice = 500
1520
1521 class BrokenStatResults:
1522     pass
1523 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1524     def stat(self, fn):
1525         s = os.stat(fn)
1526         bsr = BrokenStatResults()
1527         for attrname in dir(s):
1528             if attrname.startswith("_"):
1529                 continue
1530             if attrname == "st_blocks":
1531                 continue
1532             setattr(bsr, attrname, getattr(s, attrname))
1533         return bsr
1534
1535 class InstrumentedStorageServer(StorageServer):
1536     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1537 class No_ST_BLOCKS_StorageServer(StorageServer):
1538     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1539
1540 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1541
1542     def setUp(self):
1543         self.s = service.MultiService()
1544         self.s.startService()
1545     def tearDown(self):
1546         return self.s.stopService()
1547
1548     def make_shares(self, ss):
1549         def make(si):
1550             return (si, hashutil.tagged_hash("renew", si),
1551                     hashutil.tagged_hash("cancel", si))
1552         def make_mutable(si):
1553             return (si, hashutil.tagged_hash("renew", si),
1554                     hashutil.tagged_hash("cancel", si),
1555                     hashutil.tagged_hash("write-enabler", si))
1556         def make_extra_lease(si, num):
1557             return (hashutil.tagged_hash("renew-%d" % num, si),
1558                     hashutil.tagged_hash("cancel-%d" % num, si))
1559
1560         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1561         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1562         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1563         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1564         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1565         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1566         sharenums = [0]
1567         canary = FakeCanary()
1568         # note: 'tahoe debug dump-share' will not handle this file, since the
1569         # inner contents are not a valid CHK share
1570         data = "\xff" * 1000
1571
1572         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1573                                          1000, canary)
1574         w[0].remote_write(0, data)
1575         w[0].remote_close()
1576
1577         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1578                                          1000, canary)
1579         w[0].remote_write(0, data)
1580         w[0].remote_close()
1581         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1582
1583         writev = ss.remote_slot_testv_and_readv_and_writev
1584         writev(mutable_si_2, (we2, rs2, cs2),
1585                {0: ([], [(0,data)], len(data))}, [])
1586         writev(mutable_si_3, (we3, rs3, cs3),
1587                {0: ([], [(0,data)], len(data))}, [])
1588         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1589
1590         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1591         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1592         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1593
1594     def test_basic(self):
1595         basedir = "storage/LeaseCrawler/basic"
1596         fileutil.make_dirs(basedir)
1597         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1598         # make it start sooner than usual.
1599         lc = ss.lease_checker
1600         lc.slow_start = 0
1601         lc.cpu_slice = 500
1602         lc.stop_after_first_bucket = True
1603         webstatus = StorageStatus(ss)
1604
1605         # create a few shares, with some leases on them
1606         self.make_shares(ss)
1607         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1608
1609         # add a non-sharefile to exercise another code path
1610         fn = os.path.join(ss.sharedir,
1611                           storage_index_to_dir(immutable_si_0),
1612                           "not-a-share")
1613         f = open(fn, "wb")
1614         f.write("I am not a share.\n")
1615         f.close()
1616
1617         # this is before the crawl has started, so we're not in a cycle yet
1618         initial_state = lc.get_state()
1619         self.failIf(lc.get_progress()["cycle-in-progress"])
1620         self.failIf("cycle-to-date" in initial_state)
1621         self.failIf("estimated-remaining-cycle" in initial_state)
1622         self.failIf("estimated-current-cycle" in initial_state)
1623         self.failUnless("history" in initial_state)
1624         self.failUnlessEqual(initial_state["history"], {})
1625
1626         ss.setServiceParent(self.s)
1627
1628         DAY = 24*60*60
1629
1630         d = fireEventually()
1631
1632         # now examine the state right after the first bucket has been
1633         # processed.
1634         def _after_first_bucket(ignored):
1635             initial_state = lc.get_state()
1636             self.failUnless("cycle-to-date" in initial_state)
1637             self.failUnless("estimated-remaining-cycle" in initial_state)
1638             self.failUnless("estimated-current-cycle" in initial_state)
1639             self.failUnless("history" in initial_state)
1640             self.failUnlessEqual(initial_state["history"], {})
1641
1642             so_far = initial_state["cycle-to-date"]
1643             self.failUnlessEqual(so_far["expiration-enabled"], False)
1644             self.failUnless("configured-expiration-mode" in so_far)
1645             self.failUnless("lease-age-histogram" in so_far)
1646             lah = so_far["lease-age-histogram"]
1647             self.failUnlessEqual(type(lah), list)
1648             self.failUnlessEqual(len(lah), 1)
1649             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1650             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1651             self.failUnlessEqual(so_far["corrupt-shares"], [])
1652             sr1 = so_far["space-recovered"]
1653             self.failUnlessEqual(sr1["examined-buckets"], 1)
1654             self.failUnlessEqual(sr1["examined-shares"], 1)
1655             self.failUnlessEqual(sr1["actual-shares"], 0)
1656             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1657             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1658             left = initial_state["estimated-remaining-cycle"]
1659             sr2 = left["space-recovered"]
1660             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1661             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1662             self.failIfEqual(sr2["actual-shares"], None)
1663             self.failIfEqual(sr2["configured-diskbytes"], None)
1664             self.failIfEqual(sr2["original-sharebytes"], None)
1665         d.addCallback(_after_first_bucket)
1666         d.addCallback(lambda ign: self.render1(webstatus))
1667         def _check_html_in_cycle(html):
1668             s = remove_tags(html)
1669             self.failUnlessIn("So far, this cycle has examined "
1670                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1671             self.failUnlessIn("and has recovered: "
1672                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1673                               "0 B (0 B / 0 B)", s)
1674             self.failUnlessIn("If expiration were enabled, "
1675                               "we would have recovered: "
1676                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1677                               " 0 B (0 B / 0 B) by now", s)
1678             self.failUnlessIn("and the remainder of this cycle "
1679                               "would probably recover: "
1680                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1681                               " 0 B (0 B / 0 B)", s)
1682             self.failUnlessIn("and the whole cycle would probably recover: "
1683                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1684                               " 0 B (0 B / 0 B)", s)
1685             self.failUnlessIn("if we were strictly using each lease's default "
1686                               "31-day lease lifetime", s)
1687             self.failUnlessIn("this cycle would be expected to recover: ", s)
1688         d.addCallback(_check_html_in_cycle)
1689
1690         # wait for the crawler to finish the first cycle. Nothing should have
1691         # been removed.
1692         def _wait():
1693             return bool(lc.get_state()["last-cycle-finished"] is not None)
1694         d.addCallback(lambda ign: self.poll(_wait))
1695
1696         def _after_first_cycle(ignored):
1697             s = lc.get_state()
1698             self.failIf("cycle-to-date" in s)
1699             self.failIf("estimated-remaining-cycle" in s)
1700             self.failIf("estimated-current-cycle" in s)
1701             last = s["history"][0]
1702             self.failUnless("cycle-start-finish-times" in last)
1703             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1704             self.failUnlessEqual(last["expiration-enabled"], False)
1705             self.failUnless("configured-expiration-mode" in last)
1706
1707             self.failUnless("lease-age-histogram" in last)
1708             lah = last["lease-age-histogram"]
1709             self.failUnlessEqual(type(lah), list)
1710             self.failUnlessEqual(len(lah), 1)
1711             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1712
1713             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1714             self.failUnlessEqual(last["corrupt-shares"], [])
1715
1716             rec = last["space-recovered"]
1717             self.failUnlessEqual(rec["examined-buckets"], 4)
1718             self.failUnlessEqual(rec["examined-shares"], 4)
1719             self.failUnlessEqual(rec["actual-buckets"], 0)
1720             self.failUnlessEqual(rec["original-buckets"], 0)
1721             self.failUnlessEqual(rec["configured-buckets"], 0)
1722             self.failUnlessEqual(rec["actual-shares"], 0)
1723             self.failUnlessEqual(rec["original-shares"], 0)
1724             self.failUnlessEqual(rec["configured-shares"], 0)
1725             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1726             self.failUnlessEqual(rec["original-diskbytes"], 0)
1727             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1728             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1729             self.failUnlessEqual(rec["original-sharebytes"], 0)
1730             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1731
1732             def _get_sharefile(si):
1733                 return list(ss._iter_share_files(si))[0]
1734             def count_leases(si):
1735                 return len(list(_get_sharefile(si).get_leases()))
1736             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1737             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1738             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1739             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1740         d.addCallback(_after_first_cycle)
1741         d.addCallback(lambda ign: self.render1(webstatus))
1742         def _check_html(html):
1743             s = remove_tags(html)
1744             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1745                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
1746                               "but expiration was not enabled", s)
1747         d.addCallback(_check_html)
1748         d.addCallback(lambda ign: self.render_json(webstatus))
1749         def _check_json(json):
1750             data = simplejson.loads(json)
1751             self.failUnless("lease-checker" in data)
1752             self.failUnless("lease-checker-progress" in data)
1753         d.addCallback(_check_json)
1754         return d
1755
1756     def backdate_lease(self, sf, renew_secret, new_expire_time):
1757         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1758         # "renew" a lease with a new_expire_time that is older than what the
1759         # current lease has), so we have to reach inside it.
1760         for i,lease in enumerate(sf.get_leases()):
1761             if lease.renew_secret == renew_secret:
1762                 lease.expiration_time = new_expire_time
1763                 f = open(sf.home, 'rb+')
1764                 sf._write_lease_record(f, i, lease)
1765                 f.close()
1766                 return
1767         raise IndexError("unable to renew non-existent lease")
1768
1769     def test_expire_age(self):
1770         basedir = "storage/LeaseCrawler/expire_age"
1771         fileutil.make_dirs(basedir)
1772         # setting expiration_time to 2000 means that any lease which is more
1773         # than 2000s old will be expired.
1774         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1775                                        expiration_enabled=True,
1776                                        expiration_mode="age",
1777                                        expiration_override_lease_duration=2000)
1778         # make it start sooner than usual.
1779         lc = ss.lease_checker
1780         lc.slow_start = 0
1781         lc.stop_after_first_bucket = True
1782         webstatus = StorageStatus(ss)
1783
1784         # create a few shares, with some leases on them
1785         self.make_shares(ss)
1786         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1787
1788         def count_shares(si):
1789             return len(list(ss._iter_share_files(si)))
1790         def _get_sharefile(si):
1791             return list(ss._iter_share_files(si))[0]
1792         def count_leases(si):
1793             return len(list(_get_sharefile(si).get_leases()))
1794
1795         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1796         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1797         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1798         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1799         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1800         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1801         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1802         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1803
1804         # artificially crank back the expiration time on the first lease of
1805         # each share, to make it look like it expired already (age=1000s).
1806         # Some shares have an extra lease which is set to expire at the
1807         # default time in 31 days from now (age=31days). We then run the
1808         # crawler, which will expire the first lease, making some shares get
1809         # deleted and others stay alive (with one remaining lease)
1810         now = time.time()
1811
1812         sf0 = _get_sharefile(immutable_si_0)
1813         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1814         sf0_size = os.stat(sf0.home).st_size
1815
1816         # immutable_si_1 gets an extra lease
1817         sf1 = _get_sharefile(immutable_si_1)
1818         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1819
1820         sf2 = _get_sharefile(mutable_si_2)
1821         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1822         sf2_size = os.stat(sf2.home).st_size
1823
1824         # mutable_si_3 gets an extra lease
1825         sf3 = _get_sharefile(mutable_si_3)
1826         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1827
1828         ss.setServiceParent(self.s)
1829
1830         d = fireEventually()
1831         # examine the state right after the first bucket has been processed
1832         def _after_first_bucket(ignored):
1833             p = lc.get_progress()
1834             self.failUnless(p["cycle-in-progress"])
1835         d.addCallback(_after_first_bucket)
1836         d.addCallback(lambda ign: self.render1(webstatus))
1837         def _check_html_in_cycle(html):
1838             s = remove_tags(html)
1839             # the first bucket encountered gets deleted, and its prefix
1840             # happens to be about 1/5th of the way through the ring, so the
1841             # predictor thinks we'll have 5 shares and that we'll delete them
1842             # all. This part of the test depends upon the SIs landing right
1843             # where they do now.
1844             self.failUnlessIn("The remainder of this cycle is expected to "
1845                               "recover: 4 shares, 4 buckets", s)
1846             self.failUnlessIn("The whole cycle is expected to examine "
1847                               "5 shares in 5 buckets and to recover: "
1848                               "5 shares, 5 buckets", s)
1849         d.addCallback(_check_html_in_cycle)
1850
1851         # wait for the crawler to finish the first cycle. Two shares should
1852         # have been removed
1853         def _wait():
1854             return bool(lc.get_state()["last-cycle-finished"] is not None)
1855         d.addCallback(lambda ign: self.poll(_wait))
1856
1857         def _after_first_cycle(ignored):
1858             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1859             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1860             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1861             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1862             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1863             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1864
1865             s = lc.get_state()
1866             last = s["history"][0]
1867
1868             self.failUnlessEqual(last["expiration-enabled"], True)
1869             self.failUnlessEqual(last["configured-expiration-mode"],
1870                                  ("age", 2000, None, ("mutable", "immutable")))
1871             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1872
1873             rec = last["space-recovered"]
1874             self.failUnlessEqual(rec["examined-buckets"], 4)
1875             self.failUnlessEqual(rec["examined-shares"], 4)
1876             self.failUnlessEqual(rec["actual-buckets"], 2)
1877             self.failUnlessEqual(rec["original-buckets"], 2)
1878             self.failUnlessEqual(rec["configured-buckets"], 2)
1879             self.failUnlessEqual(rec["actual-shares"], 2)
1880             self.failUnlessEqual(rec["original-shares"], 2)
1881             self.failUnlessEqual(rec["configured-shares"], 2)
1882             size = sf0_size + sf2_size
1883             self.failUnlessEqual(rec["actual-sharebytes"], size)
1884             self.failUnlessEqual(rec["original-sharebytes"], size)
1885             self.failUnlessEqual(rec["configured-sharebytes"], size)
1886             # different platforms have different notions of "blocks used by
1887             # this file", so merely assert that it's a number
1888             self.failUnless(rec["actual-diskbytes"] >= 0,
1889                             rec["actual-diskbytes"])
1890             self.failUnless(rec["original-diskbytes"] >= 0,
1891                             rec["original-diskbytes"])
1892             self.failUnless(rec["configured-diskbytes"] >= 0,
1893                             rec["configured-diskbytes"])
1894         d.addCallback(_after_first_cycle)
1895         d.addCallback(lambda ign: self.render1(webstatus))
1896         def _check_html(html):
1897             s = remove_tags(html)
1898             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1899             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1900             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1901         d.addCallback(_check_html)
1902         return d
1903
1904     def test_expire_cutoff_date(self):
1905         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1906         fileutil.make_dirs(basedir)
1907         # setting cutoff-date to 2000 seconds ago means that any lease which
1908         # is more than 2000s old will be expired.
1909         now = time.time()
1910         then = int(now - 2000)
1911         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1912                                        expiration_enabled=True,
1913                                        expiration_mode="cutoff-date",
1914                                        expiration_cutoff_date=then)
1915         # make it start sooner than usual.
1916         lc = ss.lease_checker
1917         lc.slow_start = 0
1918         lc.stop_after_first_bucket = True
1919         webstatus = StorageStatus(ss)
1920
1921         # create a few shares, with some leases on them
1922         self.make_shares(ss)
1923         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1924
1925         def count_shares(si):
1926             return len(list(ss._iter_share_files(si)))
1927         def _get_sharefile(si):
1928             return list(ss._iter_share_files(si))[0]
1929         def count_leases(si):
1930             return len(list(_get_sharefile(si).get_leases()))
1931
1932         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1933         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1934         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1935         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1936         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1937         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1938         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1939         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1940
1941         # artificially crank back the expiration time on the first lease of
1942         # each share, to make it look like was renewed 3000s ago. To achieve
1943         # this, we need to set the expiration time to now-3000+31days. This
1944         # will change when the lease format is improved to contain both
1945         # create/renew time and duration.
1946         new_expiration_time = now - 3000 + 31*24*60*60
1947
1948         # Some shares have an extra lease which is set to expire at the
1949         # default time in 31 days from now (age=31days). We then run the
1950         # crawler, which will expire the first lease, making some shares get
1951         # deleted and others stay alive (with one remaining lease)
1952
1953         sf0 = _get_sharefile(immutable_si_0)
1954         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1955         sf0_size = os.stat(sf0.home).st_size
1956
1957         # immutable_si_1 gets an extra lease
1958         sf1 = _get_sharefile(immutable_si_1)
1959         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1960
1961         sf2 = _get_sharefile(mutable_si_2)
1962         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1963         sf2_size = os.stat(sf2.home).st_size
1964
1965         # mutable_si_3 gets an extra lease
1966         sf3 = _get_sharefile(mutable_si_3)
1967         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1968
1969         ss.setServiceParent(self.s)
1970
1971         d = fireEventually()
1972         # examine the state right after the first bucket has been processed
1973         def _after_first_bucket(ignored):
1974             p = lc.get_progress()
1975             self.failUnless(p["cycle-in-progress"])
1976         d.addCallback(_after_first_bucket)
1977         d.addCallback(lambda ign: self.render1(webstatus))
1978         def _check_html_in_cycle(html):
1979             s = remove_tags(html)
1980             # the first bucket encountered gets deleted, and its prefix
1981             # happens to be about 1/5th of the way through the ring, so the
1982             # predictor thinks we'll have 5 shares and that we'll delete them
1983             # all. This part of the test depends upon the SIs landing right
1984             # where they do now.
1985             self.failUnlessIn("The remainder of this cycle is expected to "
1986                               "recover: 4 shares, 4 buckets", s)
1987             self.failUnlessIn("The whole cycle is expected to examine "
1988                               "5 shares in 5 buckets and to recover: "
1989                               "5 shares, 5 buckets", s)
1990         d.addCallback(_check_html_in_cycle)
1991
1992         # wait for the crawler to finish the first cycle. Two shares should
1993         # have been removed
1994         def _wait():
1995             return bool(lc.get_state()["last-cycle-finished"] is not None)
1996         d.addCallback(lambda ign: self.poll(_wait))
1997
1998         def _after_first_cycle(ignored):
1999             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2000             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2001             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2002             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2003             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2004             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2005
2006             s = lc.get_state()
2007             last = s["history"][0]
2008
2009             self.failUnlessEqual(last["expiration-enabled"], True)
2010             self.failUnlessEqual(last["configured-expiration-mode"],
2011                                  ("cutoff-date", None, then,
2012                                   ("mutable", "immutable")))
2013             self.failUnlessEqual(last["leases-per-share-histogram"],
2014                                  {1: 2, 2: 2})
2015
2016             rec = last["space-recovered"]
2017             self.failUnlessEqual(rec["examined-buckets"], 4)
2018             self.failUnlessEqual(rec["examined-shares"], 4)
2019             self.failUnlessEqual(rec["actual-buckets"], 2)
2020             self.failUnlessEqual(rec["original-buckets"], 0)
2021             self.failUnlessEqual(rec["configured-buckets"], 2)
2022             self.failUnlessEqual(rec["actual-shares"], 2)
2023             self.failUnlessEqual(rec["original-shares"], 0)
2024             self.failUnlessEqual(rec["configured-shares"], 2)
2025             size = sf0_size + sf2_size
2026             self.failUnlessEqual(rec["actual-sharebytes"], size)
2027             self.failUnlessEqual(rec["original-sharebytes"], 0)
2028             self.failUnlessEqual(rec["configured-sharebytes"], size)
2029             # different platforms have different notions of "blocks used by
2030             # this file", so merely assert that it's a number
2031             self.failUnless(rec["actual-diskbytes"] >= 0,
2032                             rec["actual-diskbytes"])
2033             self.failUnless(rec["original-diskbytes"] >= 0,
2034                             rec["original-diskbytes"])
2035             self.failUnless(rec["configured-diskbytes"] >= 0,
2036                             rec["configured-diskbytes"])
2037         d.addCallback(_after_first_cycle)
2038         d.addCallback(lambda ign: self.render1(webstatus))
2039         def _check_html(html):
2040             s = remove_tags(html)
2041             self.failUnlessIn("Expiration Enabled:"
2042                               " expired leases will be removed", s)
2043             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2044             substr = "Leases created or last renewed before %s will be considered expired." % date
2045             self.failUnlessIn(substr, s)
2046             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2047         d.addCallback(_check_html)
2048         return d
2049
2050     def test_only_immutable(self):
2051         basedir = "storage/LeaseCrawler/only_immutable"
2052         fileutil.make_dirs(basedir)
2053         now = time.time()
2054         then = int(now - 2000)
2055         ss = StorageServer(basedir, "\x00" * 20,
2056                            expiration_enabled=True,
2057                            expiration_mode="cutoff-date",
2058                            expiration_cutoff_date=then,
2059                            expiration_sharetypes=("immutable",))
2060         lc = ss.lease_checker
2061         lc.slow_start = 0
2062         webstatus = StorageStatus(ss)
2063
2064         self.make_shares(ss)
2065         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2066         # set all leases to be expirable
2067         new_expiration_time = now - 3000 + 31*24*60*60
2068
2069         def count_shares(si):
2070             return len(list(ss._iter_share_files(si)))
2071         def _get_sharefile(si):
2072             return list(ss._iter_share_files(si))[0]
2073         def count_leases(si):
2074             return len(list(_get_sharefile(si).get_leases()))
2075
2076         sf0 = _get_sharefile(immutable_si_0)
2077         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2078         sf1 = _get_sharefile(immutable_si_1)
2079         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2080         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2081         sf2 = _get_sharefile(mutable_si_2)
2082         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2083         sf3 = _get_sharefile(mutable_si_3)
2084         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2085         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2086
2087         ss.setServiceParent(self.s)
2088         def _wait():
2089             return bool(lc.get_state()["last-cycle-finished"] is not None)
2090         d = self.poll(_wait)
2091
2092         def _after_first_cycle(ignored):
2093             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2094             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2095             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2096             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2097             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2098             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2099         d.addCallback(_after_first_cycle)
2100         d.addCallback(lambda ign: self.render1(webstatus))
2101         def _check_html(html):
2102             s = remove_tags(html)
2103             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2104         d.addCallback(_check_html)
2105         return d
2106
2107     def test_only_mutable(self):
2108         basedir = "storage/LeaseCrawler/only_mutable"
2109         fileutil.make_dirs(basedir)
2110         now = time.time()
2111         then = int(now - 2000)
2112         ss = StorageServer(basedir, "\x00" * 20,
2113                            expiration_enabled=True,
2114                            expiration_mode="cutoff-date",
2115                            expiration_cutoff_date=then,
2116                            expiration_sharetypes=("mutable",))
2117         lc = ss.lease_checker
2118         lc.slow_start = 0
2119         webstatus = StorageStatus(ss)
2120
2121         self.make_shares(ss)
2122         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2123         # set all leases to be expirable
2124         new_expiration_time = now - 3000 + 31*24*60*60
2125
2126         def count_shares(si):
2127             return len(list(ss._iter_share_files(si)))
2128         def _get_sharefile(si):
2129             return list(ss._iter_share_files(si))[0]
2130         def count_leases(si):
2131             return len(list(_get_sharefile(si).get_leases()))
2132
2133         sf0 = _get_sharefile(immutable_si_0)
2134         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2135         sf1 = _get_sharefile(immutable_si_1)
2136         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2137         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2138         sf2 = _get_sharefile(mutable_si_2)
2139         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2140         sf3 = _get_sharefile(mutable_si_3)
2141         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2142         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2143
2144         ss.setServiceParent(self.s)
2145         def _wait():
2146             return bool(lc.get_state()["last-cycle-finished"] is not None)
2147         d = self.poll(_wait)
2148
2149         def _after_first_cycle(ignored):
2150             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2151             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2152             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2153             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2154             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2155             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2156         d.addCallback(_after_first_cycle)
2157         d.addCallback(lambda ign: self.render1(webstatus))
2158         def _check_html(html):
2159             s = remove_tags(html)
2160             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2161         d.addCallback(_check_html)
2162         return d
2163
2164     def test_bad_mode(self):
2165         basedir = "storage/LeaseCrawler/bad_mode"
2166         fileutil.make_dirs(basedir)
2167         e = self.failUnlessRaises(ValueError,
2168                                   StorageServer, basedir, "\x00" * 20,
2169                                   expiration_mode="bogus")
2170         self.failUnless("GC mode 'bogus' must be 'age' or 'cutoff-date'" in str(e), str(e))
2171
2172     def test_parse_duration(self):
2173         DAY = 24*60*60
2174         MONTH = 31*DAY
2175         YEAR = 365*DAY
2176         p = time_format.parse_duration
2177         self.failUnlessEqual(p("7days"), 7*DAY)
2178         self.failUnlessEqual(p("31day"), 31*DAY)
2179         self.failUnlessEqual(p("60 days"), 60*DAY)
2180         self.failUnlessEqual(p("2mo"), 2*MONTH)
2181         self.failUnlessEqual(p("3 month"), 3*MONTH)
2182         self.failUnlessEqual(p("2years"), 2*YEAR)
2183         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2184         self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2185                         in str(e), str(e))
2186
2187     def test_parse_date(self):
2188         p = time_format.parse_date
2189         self.failUnless(isinstance(p("2009-03-18"), int))
2190         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2191
2192     def test_limited_history(self):
2193         basedir = "storage/LeaseCrawler/limited_history"
2194         fileutil.make_dirs(basedir)
2195         ss = StorageServer(basedir, "\x00" * 20)
2196         # make it start sooner than usual.
2197         lc = ss.lease_checker
2198         lc.slow_start = 0
2199         lc.cpu_slice = 500
2200
2201         # create a few shares, with some leases on them
2202         self.make_shares(ss)
2203
2204         ss.setServiceParent(self.s)
2205
2206         def _wait_until_15_cycles_done():
2207             last = lc.state["last-cycle-finished"]
2208             if last is not None and last >= 15:
2209                 return True
2210             if lc.timer:
2211                 lc.timer.reset(0)
2212             return False
2213         d = self.poll(_wait_until_15_cycles_done)
2214
2215         def _check(ignored):
2216             s = lc.get_state()
2217             h = s["history"]
2218             self.failUnlessEqual(len(h), 10)
2219             self.failUnlessEqual(max(h.keys()), 15)
2220             self.failUnlessEqual(min(h.keys()), 6)
2221         d.addCallback(_check)
2222         return d
2223
2224     def test_unpredictable_future(self):
2225         basedir = "storage/LeaseCrawler/unpredictable_future"
2226         fileutil.make_dirs(basedir)
2227         ss = StorageServer(basedir, "\x00" * 20)
2228         # make it start sooner than usual.
2229         lc = ss.lease_checker
2230         lc.slow_start = 0
2231         lc.cpu_slice = -1.0 # stop quickly
2232
2233         self.make_shares(ss)
2234
2235         ss.setServiceParent(self.s)
2236
2237         d = fireEventually()
2238         def _check(ignored):
2239             # this should fire after the first bucket is complete, but before
2240             # the first prefix is complete, so the progress-measurer won't
2241             # think we've gotten far enough to raise our percent-complete
2242             # above 0%, triggering the cannot-predict-the-future code in
2243             # expirer.py . This will have to change if/when the
2244             # progress-measurer gets smart enough to count buckets (we'll
2245             # have to interrupt it even earlier, before it's finished the
2246             # first bucket).
2247             s = lc.get_state()
2248             self.failUnless("cycle-to-date" in s)
2249             self.failUnless("estimated-remaining-cycle" in s)
2250             self.failUnless("estimated-current-cycle" in s)
2251
2252             left = s["estimated-remaining-cycle"]["space-recovered"]
2253             self.failUnlessEqual(left["actual-buckets"], None)
2254             self.failUnlessEqual(left["original-buckets"], None)
2255             self.failUnlessEqual(left["configured-buckets"], None)
2256             self.failUnlessEqual(left["actual-shares"], None)
2257             self.failUnlessEqual(left["original-shares"], None)
2258             self.failUnlessEqual(left["configured-shares"], None)
2259             self.failUnlessEqual(left["actual-diskbytes"], None)
2260             self.failUnlessEqual(left["original-diskbytes"], None)
2261             self.failUnlessEqual(left["configured-diskbytes"], None)
2262             self.failUnlessEqual(left["actual-sharebytes"], None)
2263             self.failUnlessEqual(left["original-sharebytes"], None)
2264             self.failUnlessEqual(left["configured-sharebytes"], None)
2265
2266             full = s["estimated-remaining-cycle"]["space-recovered"]
2267             self.failUnlessEqual(full["actual-buckets"], None)
2268             self.failUnlessEqual(full["original-buckets"], None)
2269             self.failUnlessEqual(full["configured-buckets"], None)
2270             self.failUnlessEqual(full["actual-shares"], None)
2271             self.failUnlessEqual(full["original-shares"], None)
2272             self.failUnlessEqual(full["configured-shares"], None)
2273             self.failUnlessEqual(full["actual-diskbytes"], None)
2274             self.failUnlessEqual(full["original-diskbytes"], None)
2275             self.failUnlessEqual(full["configured-diskbytes"], None)
2276             self.failUnlessEqual(full["actual-sharebytes"], None)
2277             self.failUnlessEqual(full["original-sharebytes"], None)
2278             self.failUnlessEqual(full["configured-sharebytes"], None)
2279
2280         d.addCallback(_check)
2281         return d
2282
2283     def test_no_st_blocks(self):
2284         basedir = "storage/LeaseCrawler/no_st_blocks"
2285         fileutil.make_dirs(basedir)
2286         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2287                                         expiration_mode="age",
2288                                         expiration_override_lease_duration=-1000)
2289         # a negative expiration_time= means the "configured-"
2290         # space-recovered counts will be non-zero, since all shares will have
2291         # expired by then
2292
2293         # make it start sooner than usual.
2294         lc = ss.lease_checker
2295         lc.slow_start = 0
2296
2297         self.make_shares(ss)
2298         ss.setServiceParent(self.s)
2299         def _wait():
2300             return bool(lc.get_state()["last-cycle-finished"] is not None)
2301         d = self.poll(_wait)
2302
2303         def _check(ignored):
2304             s = lc.get_state()
2305             last = s["history"][0]
2306             rec = last["space-recovered"]
2307             self.failUnlessEqual(rec["configured-buckets"], 4)
2308             self.failUnlessEqual(rec["configured-shares"], 4)
2309             self.failUnless(rec["configured-sharebytes"] > 0,
2310                             rec["configured-sharebytes"])
2311             # without the .st_blocks field in os.stat() results, we should be
2312             # reporting diskbytes==sharebytes
2313             self.failUnlessEqual(rec["configured-sharebytes"],
2314                                  rec["configured-diskbytes"])
2315         d.addCallback(_check)
2316         return d
2317
2318     def test_share_corruption(self):
2319         self._poll_should_ignore_these_errors = [
2320             UnknownMutableContainerVersionError,
2321             UnknownImmutableContainerVersionError,
2322             ]
2323         basedir = "storage/LeaseCrawler/share_corruption"
2324         fileutil.make_dirs(basedir)
2325         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2326         w = StorageStatus(ss)
2327         # make it start sooner than usual.
2328         lc = ss.lease_checker
2329         lc.stop_after_first_bucket = True
2330         lc.slow_start = 0
2331         lc.cpu_slice = 500
2332
2333         # create a few shares, with some leases on them
2334         self.make_shares(ss)
2335
2336         # now corrupt one, and make sure the lease-checker keeps going
2337         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2338         first = min(self.sis)
2339         first_b32 = base32.b2a(first)
2340         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2341         f = open(fn, "rb+")
2342         f.seek(0)
2343         f.write("BAD MAGIC")
2344         f.close()
2345         # if get_share_file() doesn't see the correct mutable magic, it
2346         # assumes the file is an immutable share, and then
2347         # immutable.ShareFile sees a bad version. So regardless of which kind
2348         # of share we corrupted, this will trigger an
2349         # UnknownImmutableContainerVersionError.
2350
2351         # also create an empty bucket
2352         empty_si = base32.b2a("\x04"*16)
2353         empty_bucket_dir = os.path.join(ss.sharedir,
2354                                         storage_index_to_dir(empty_si))
2355         fileutil.make_dirs(empty_bucket_dir)
2356
2357         ss.setServiceParent(self.s)
2358
2359         d = fireEventually()
2360
2361         # now examine the state right after the first bucket has been
2362         # processed.
2363         def _after_first_bucket(ignored):
2364             so_far = lc.get_state()["cycle-to-date"]
2365             rec = so_far["space-recovered"]
2366             self.failUnlessEqual(rec["examined-buckets"], 1)
2367             self.failUnlessEqual(rec["examined-shares"], 0)
2368             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2369         d.addCallback(_after_first_bucket)
2370
2371         d.addCallback(lambda ign: self.render_json(w))
2372         def _check_json(json):
2373             data = simplejson.loads(json)
2374             # grr. json turns all dict keys into strings.
2375             so_far = data["lease-checker"]["cycle-to-date"]
2376             corrupt_shares = so_far["corrupt-shares"]
2377             # it also turns all tuples into lists
2378             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2379         d.addCallback(_check_json)
2380         d.addCallback(lambda ign: self.render1(w))
2381         def _check_html(html):
2382             s = remove_tags(html)
2383             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2384         d.addCallback(_check_html)
2385
2386         def _wait():
2387             return bool(lc.get_state()["last-cycle-finished"] is not None)
2388         d.addCallback(lambda ign: self.poll(_wait))
2389
2390         def _after_first_cycle(ignored):
2391             s = lc.get_state()
2392             last = s["history"][0]
2393             rec = last["space-recovered"]
2394             self.failUnlessEqual(rec["examined-buckets"], 5)
2395             self.failUnlessEqual(rec["examined-shares"], 3)
2396             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2397         d.addCallback(_after_first_cycle)
2398         d.addCallback(lambda ign: self.render_json(w))
2399         def _check_json_history(json):
2400             data = simplejson.loads(json)
2401             last = data["lease-checker"]["history"]["0"]
2402             corrupt_shares = last["corrupt-shares"]
2403             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2404         d.addCallback(_check_json_history)
2405         d.addCallback(lambda ign: self.render1(w))
2406         def _check_html_history(html):
2407             s = remove_tags(html)
2408             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2409         d.addCallback(_check_html_history)
2410
2411         def _cleanup(res):
2412             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2413                                    UnknownImmutableContainerVersionError)
2414             return res
2415         d.addBoth(_cleanup)
2416         return d
2417
2418     def render_json(self, page):
2419         d = self.render1(page, args={"t": ["json"]})
2420         return d
2421
2422 class NoDiskStatsServer(StorageServer):
2423     def get_disk_stats(self):
2424         raise AttributeError
2425
2426 class BadDiskStatsServer(StorageServer):
2427     def get_disk_stats(self):
2428         raise OSError
2429
2430 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2431
2432     def setUp(self):
2433         self.s = service.MultiService()
2434         self.s.startService()
2435     def tearDown(self):
2436         return self.s.stopService()
2437
2438     def test_no_server(self):
2439         w = StorageStatus(None)
2440         html = w.renderSynchronously()
2441         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2442
2443     def test_status(self):
2444         basedir = "storage/WebStatus/status"
2445         fileutil.make_dirs(basedir)
2446         ss = StorageServer(basedir, "\x00" * 20)
2447         ss.setServiceParent(self.s)
2448         w = StorageStatus(ss)
2449         d = self.render1(w)
2450         def _check_html(html):
2451             self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2452             s = remove_tags(html)
2453             self.failUnless("Accepting new shares: Yes" in s, s)
2454             self.failUnless("Reserved space: - 0 B (0)" in s, s)
2455         d.addCallback(_check_html)
2456         d.addCallback(lambda ign: self.render_json(w))
2457         def _check_json(json):
2458             data = simplejson.loads(json)
2459             s = data["stats"]
2460             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2461             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2462             self.failUnless("bucket-counter" in data)
2463             self.failUnless("lease-checker" in data)
2464         d.addCallback(_check_json)
2465         return d
2466
2467     def render_json(self, page):
2468         d = self.render1(page, args={"t": ["json"]})
2469         return d
2470
2471     def test_status_no_disk_stats(self):
2472         # Some platforms may have no disk stats API. Make sure the code can handle that
2473         # (test runs on all platforms).
2474         basedir = "storage/WebStatus/status_no_disk_stats"
2475         fileutil.make_dirs(basedir)
2476         ss = NoDiskStatsServer(basedir, "\x00" * 20)
2477         ss.setServiceParent(self.s)
2478         w = StorageStatus(ss)
2479         html = w.renderSynchronously()
2480         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2481         s = remove_tags(html)
2482         self.failUnless("Accepting new shares: Yes" in s, s)
2483         self.failUnless("Total disk space: ?" in s, s)
2484         self.failUnless("Space Available to Tahoe: ?" in s, s)
2485         self.failUnless(ss.get_available_space() is None)
2486
2487     def test_status_bad_disk_stats(self):
2488         # If the API to get disk stats exists but a call to it fails, then the status should
2489         # show that no shares will be accepted, and get_available_space() should be 0.
2490         basedir = "storage/WebStatus/status_bad_disk_stats"
2491         fileutil.make_dirs(basedir)
2492         ss = BadDiskStatsServer(basedir, "\x00" * 20)
2493         ss.setServiceParent(self.s)
2494         w = StorageStatus(ss)
2495         html = w.renderSynchronously()
2496         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2497         s = remove_tags(html)
2498         self.failUnless("Accepting new shares: No" in s, s)
2499         self.failUnless("Total disk space: ?" in s, s)
2500         self.failUnless("Space Available to Tahoe: ?" in s, s)
2501         self.failUnless(ss.get_available_space() == 0)
2502
2503     def test_readonly(self):
2504         basedir = "storage/WebStatus/readonly"
2505         fileutil.make_dirs(basedir)
2506         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2507         ss.setServiceParent(self.s)
2508         w = StorageStatus(ss)
2509         html = w.renderSynchronously()
2510         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2511         s = remove_tags(html)
2512         self.failUnless("Accepting new shares: No" in s, s)
2513
2514     def test_reserved(self):
2515         basedir = "storage/WebStatus/reserved"
2516         fileutil.make_dirs(basedir)
2517         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2518         ss.setServiceParent(self.s)
2519         w = StorageStatus(ss)
2520         html = w.renderSynchronously()
2521         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2522         s = remove_tags(html)
2523         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2524
2525     def test_huge_reserved(self):
2526         basedir = "storage/WebStatus/reserved"
2527         fileutil.make_dirs(basedir)
2528         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2529         ss.setServiceParent(self.s)
2530         w = StorageStatus(ss)
2531         html = w.renderSynchronously()
2532         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2533         s = remove_tags(html)
2534         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2535
2536     def test_util(self):
2537         w = StorageStatus(None)
2538         self.failUnlessEqual(w.render_space(None, None), "?")
2539         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2540         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2541         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2542         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2543         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
2544