]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
594c536ab03ff1759d7430247a94749c36d442ca
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re, simplejson, struct
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
26
27 class Marker:
28     pass
29 class FakeCanary:
30     def __init__(self, ignore_disconnectors=False):
31         self.ignore = ignore_disconnectors
32         self.disconnectors = {}
33     def notifyOnDisconnect(self, f, *args, **kwargs):
34         if self.ignore:
35             return
36         m = Marker()
37         self.disconnectors[m] = (f, args, kwargs)
38         return m
39     def dontNotifyOnDisconnect(self, marker):
40         if self.ignore:
41             return
42         del self.disconnectors[marker]
43
44 class FakeStatsProvider:
45     def count(self, name, delta=1):
46         pass
47     def register_producer(self, producer):
48         pass
49
50 class Bucket(unittest.TestCase):
51     def make_workdir(self, name):
52         basedir = os.path.join("storage", "Bucket", name)
53         incoming = os.path.join(basedir, "tmp", "bucket")
54         final = os.path.join(basedir, "bucket")
55         fileutil.make_dirs(basedir)
56         fileutil.make_dirs(os.path.join(basedir, "tmp"))
57         return incoming, final
58
59     def bucket_writer_closed(self, bw, consumed):
60         pass
61     def add_latency(self, category, latency):
62         pass
63     def count(self, name, delta=1):
64         pass
65
66     def make_lease(self):
67         owner_num = 0
68         renew_secret = os.urandom(32)
69         cancel_secret = os.urandom(32)
70         expiration_time = time.time() + 5000
71         return LeaseInfo(owner_num, renew_secret, cancel_secret,
72                          expiration_time, "\x00" * 20)
73
74     def test_create(self):
75         incoming, final = self.make_workdir("test_create")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*25)
81         bw.remote_write(75, "d"*7)
82         bw.remote_close()
83
84     def test_readwrite(self):
85         incoming, final = self.make_workdir("test_readwrite")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*7) # last block may be short
91         bw.remote_close()
92
93         # now read from it
94         br = BucketReader(self, bw.finalhome)
95         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
98
99 class RemoteBucket:
100
101     def callRemote(self, methname, *args, **kwargs):
102         def _call():
103             meth = getattr(self.target, "remote_" + methname)
104             return meth(*args, **kwargs)
105         return defer.maybeDeferred(_call)
106
107 class BucketProxy(unittest.TestCase):
108     def make_bucket(self, name, size):
109         basedir = os.path.join("storage", "BucketProxy", name)
110         incoming = os.path.join(basedir, "tmp", "bucket")
111         final = os.path.join(basedir, "bucket")
112         fileutil.make_dirs(basedir)
113         fileutil.make_dirs(os.path.join(basedir, "tmp"))
114         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
115                           FakeCanary())
116         rb = RemoteBucket()
117         rb.target = bw
118         return bw, rb, final
119
120     def make_lease(self):
121         owner_num = 0
122         renew_secret = os.urandom(32)
123         cancel_secret = os.urandom(32)
124         expiration_time = time.time() + 5000
125         return LeaseInfo(owner_num, renew_secret, cancel_secret,
126                          expiration_time, "\x00" * 20)
127
128     def bucket_writer_closed(self, bw, consumed):
129         pass
130     def add_latency(self, category, latency):
131         pass
132     def count(self, name, delta=1):
133         pass
134
135     def test_create(self):
136         bw, rb, sharefname = self.make_bucket("test_create", 500)
137         bp = WriteBucketProxy(rb,
138                               data_size=300,
139                               block_size=10,
140                               num_segments=5,
141                               num_share_hashes=3,
142                               uri_extension_size_max=500, nodeid=None)
143         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
144
145     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146         # Let's pretend each share has 100 bytes of data, and that there are
147         # 4 segments (25 bytes each), and 8 shares total. So the two
148         # per-segment merkle trees (crypttext_hash_tree,
149         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152         # long. That should make the whole share:
153         #
154         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
156
157         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
158
159         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
160                             for i in range(7)]
161         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
162                         for i in range(7)]
163         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
164                         for i in (1,9,13)]
165         uri_extension = "s" + "E"*498 + "e"
166
167         bw, rb, sharefname = self.make_bucket(name, sharesize)
168         bp = wbp_class(rb,
169                        data_size=95,
170                        block_size=25,
171                        num_segments=4,
172                        num_share_hashes=3,
173                        uri_extension_size_max=len(uri_extension),
174                        nodeid=None)
175
176         d = bp.put_header()
177         d.addCallback(lambda res: bp.put_block(0, "a"*25))
178         d.addCallback(lambda res: bp.put_block(1, "b"*25))
179         d.addCallback(lambda res: bp.put_block(2, "c"*25))
180         d.addCallback(lambda res: bp.put_block(3, "d"*20))
181         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185         d.addCallback(lambda res: bp.close())
186
187         # now read everything back
188         def _start_reading(res):
189             br = BucketReader(self, sharefname)
190             rb = RemoteBucket()
191             rb.target = br
192             rbp = rbp_class(rb, peerid="abc", storage_index="")
193             self.failUnlessIn("to peer", repr(rbp))
194             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
195
196             d1 = rbp.get_block_data(0, 25, 25)
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
204
205             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, crypttext_hashes))
208             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210             d1.addCallback(lambda res: rbp.get_share_hashes())
211             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212             d1.addCallback(lambda res: rbp.get_uri_extension())
213             d1.addCallback(lambda res:
214                            self.failUnlessEqual(res, uri_extension))
215
216             return d1
217
218         d.addCallback(_start_reading)
219
220         return d
221
222     def test_readwrite_v1(self):
223         return self._do_test_readwrite("test_readwrite_v1",
224                                        0x24, WriteBucketProxy, ReadBucketProxy)
225
226     def test_readwrite_v2(self):
227         return self._do_test_readwrite("test_readwrite_v2",
228                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
229
230 class FakeDiskStorageServer(StorageServer):
231     DISKAVAIL = 0
232     def get_disk_stats(self):
233         return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
234
235 class Server(unittest.TestCase):
236
237     def setUp(self):
238         self.sparent = LoggingServiceParent()
239         self.sparent.startService()
240         self._lease_secret = itertools.count()
241     def tearDown(self):
242         return self.sparent.stopService()
243
244     def workdir(self, name):
245         basedir = os.path.join("storage", "Server", name)
246         return basedir
247
248     def create(self, name, reserved_space=0, klass=StorageServer):
249         workdir = self.workdir(name)
250         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251                    stats_provider=FakeStatsProvider())
252         ss.setServiceParent(self.sparent)
253         return ss
254
255     def test_create(self):
256         self.create("test_create")
257
258     def allocate(self, ss, storage_index, sharenums, size, canary=None):
259         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
261         if not canary:
262             canary = FakeCanary()
263         return ss.remote_allocate_buckets(storage_index,
264                                           renew_secret, cancel_secret,
265                                           sharenums, size, canary)
266
267     def test_large_share(self):
268         ss = self.create("test_large_share")
269
270         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271         self.failUnlessEqual(already, set())
272         self.failUnlessEqual(set(writers.keys()), set([0]))
273
274         shnum, bucket = writers.items()[0]
275         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
276         bucket.remote_write(2**32, "ab")
277         bucket.remote_close()
278
279         readers = ss.remote_get_buckets("allocate")
280         reader = readers[shnum]
281         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
283
284     def test_dont_overfill_dirs(self):
285         """
286         This test asserts that if you add a second share whose storage index
287         share lots of leading bits with an extant share (but isn't the exact
288         same storage index), this won't add an entry to the share directory.
289         """
290         ss = self.create("test_dont_overfill_dirs")
291         already, writers = self.allocate(ss, "storageindex", [0], 10)
292         for i, wb in writers.items():
293             wb.remote_write(0, "%10d" % i)
294             wb.remote_close()
295         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296                                 "shares")
297         children_of_storedir = set(os.listdir(storedir))
298
299         # Now store another one under another storageindex that has leading
300         # chars the same as the first storageindex.
301         already, writers = self.allocate(ss, "storageindey", [0], 10)
302         for i, wb in writers.items():
303             wb.remote_write(0, "%10d" % i)
304             wb.remote_close()
305         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
306                                 "shares")
307         new_children_of_storedir = set(os.listdir(storedir))
308         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
309
310     def test_remove_incoming(self):
311         ss = self.create("test_remove_incoming")
312         already, writers = self.allocate(ss, "vid", range(3), 10)
313         for i,wb in writers.items():
314             wb.remote_write(0, "%10d" % i)
315             wb.remote_close()
316         incoming_share_dir = wb.incominghome
317         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319         incoming_dir = os.path.dirname(incoming_prefix_dir)
320         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
321         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
322         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
323
324     def test_allocate(self):
325         ss = self.create("test_allocate")
326
327         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
328
329         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330         self.failUnlessEqual(already, set())
331         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
332
333         # while the buckets are open, they should not count as readable
334         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
335
336         # close the buckets
337         for i,wb in writers.items():
338             wb.remote_write(0, "%25d" % i)
339             wb.remote_close()
340             # aborting a bucket that was already closed is a no-op
341             wb.remote_abort()
342
343         # now they should be readable
344         b = ss.remote_get_buckets("allocate")
345         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
347         b_str = str(b[0])
348         self.failUnlessIn("BucketReader", b_str)
349         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
350
351         # now if we ask about writing again, the server should offer those
352         # three buckets as already present. It should offer them even if we
353         # don't ask about those specific ones.
354         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355         self.failUnlessEqual(already, set([0,1,2]))
356         self.failUnlessEqual(set(writers.keys()), set([3,4]))
357
358         # while those two buckets are open for writing, the server should
359         # refuse to offer them to uploaders
360
361         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362         self.failUnlessEqual(already2, set([0,1,2]))
363         self.failUnlessEqual(set(writers2.keys()), set([5]))
364
365         # aborting the writes should remove the tempfiles
366         for i,wb in writers2.items():
367             wb.remote_abort()
368         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369         self.failUnlessEqual(already2, set([0,1,2]))
370         self.failUnlessEqual(set(writers2.keys()), set([5]))
371
372         for i,wb in writers2.items():
373             wb.remote_abort()
374         for i,wb in writers.items():
375             wb.remote_abort()
376
377     def test_bad_container_version(self):
378         ss = self.create("test_bad_container_version")
379         a,w = self.allocate(ss, "si1", [0], 10)
380         w[0].remote_write(0, "\xff"*10)
381         w[0].remote_close()
382
383         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
384         f = open(fn, "rb+")
385         f.seek(0)
386         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
387         f.close()
388
389         ss.remote_get_buckets("allocate")
390
391         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392                                   ss.remote_get_buckets, "si1")
393         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
394
395     def test_disconnect(self):
396         # simulate a disconnection
397         ss = self.create("test_disconnect")
398         canary = FakeCanary()
399         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400         self.failUnlessEqual(already, set())
401         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402         for (f,args,kwargs) in canary.disconnectors.values():
403             f(*args, **kwargs)
404         del already
405         del writers
406
407         # that ought to delete the incoming shares
408         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409         self.failUnlessEqual(already, set())
410         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
411
412     def test_reserved_space(self):
413         ss = self.create("test_reserved_space", reserved_space=10000,
414                          klass=FakeDiskStorageServer)
415         # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
416         ss.DISKAVAIL = 15000
417         # 15k available, 10k reserved, leaves 5k for shares
418
419         # a newly created and filled share incurs this much overhead, beyond
420         # the size we request.
421         OVERHEAD = 3*4
422         LEASE_SIZE = 4+32+32+4
423         canary = FakeCanary(True)
424         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425         self.failUnlessEqual(len(writers), 3)
426         # now the StorageServer should have 3000 bytes provisionally
427         # allocated, allowing only 2000 more to be claimed
428         self.failUnlessEqual(len(ss._active_writers), 3)
429
430         # allocating 1001-byte shares only leaves room for one
431         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432         self.failUnlessEqual(len(writers2), 1)
433         self.failUnlessEqual(len(ss._active_writers), 4)
434
435         # we abandon the first set, so their provisional allocation should be
436         # returned
437         del already
438         del writers
439         self.failUnlessEqual(len(ss._active_writers), 1)
440         # now we have a provisional allocation of 1001 bytes
441
442         # and we close the second set, so their provisional allocation should
443         # become real, long-term allocation, and grows to include the
444         # overhead.
445         for bw in writers2.values():
446             bw.remote_write(0, "a"*25)
447             bw.remote_close()
448         del already2
449         del writers2
450         del bw
451         self.failUnlessEqual(len(ss._active_writers), 0)
452
453         allocated = 1001 + OVERHEAD + LEASE_SIZE
454
455         # we have to manually increase DISKAVAIL, since we're not doing real
456         # disk measurements
457         ss.DISKAVAIL -= allocated
458
459         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
461         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462         self.failUnlessEqual(len(writers3), 39)
463         self.failUnlessEqual(len(ss._active_writers), 39)
464
465         del already3
466         del writers3
467         self.failUnlessEqual(len(ss._active_writers), 0)
468         ss.disownServiceParent()
469         del ss
470
471     def test_disk_stats(self):
472         # This will spuriously fail if there is zero disk space left (but so will other tests).
473         ss = self.create("test_disk_stats", reserved_space=0)
474
475         disk = ss.get_disk_stats()
476         self.failUnless(disk['total'] > 0, disk['total'])
477         self.failUnless(disk['used'] > 0, disk['used'])
478         self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
479         self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
480         self.failUnless(disk['avail'] > 0, disk['avail'])
481
482     def test_disk_stats_avail_nonnegative(self):
483         ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
484
485         disk = ss.get_disk_stats()
486         self.failUnlessEqual(disk['avail'], 0)
487
488     def test_seek(self):
489         basedir = self.workdir("test_seek_behavior")
490         fileutil.make_dirs(basedir)
491         filename = os.path.join(basedir, "testfile")
492         f = open(filename, "wb")
493         f.write("start")
494         f.close()
495         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
496         # files. mode="a" preserves previous contents but does not allow
497         # seeking-to-create-holes. mode="r+" allows both.
498         f = open(filename, "rb+")
499         f.seek(100)
500         f.write("100")
501         f.close()
502         filelen = os.stat(filename)[stat.ST_SIZE]
503         self.failUnlessEqual(filelen, 100+3)
504         f2 = open(filename, "rb")
505         self.failUnlessEqual(f2.read(5), "start")
506
507
508     def test_leases(self):
509         ss = self.create("test_leases")
510         canary = FakeCanary()
511         sharenums = range(5)
512         size = 100
513
514         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
515                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
516         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
517                                                      sharenums, size, canary)
518         self.failUnlessEqual(len(already), 0)
519         self.failUnlessEqual(len(writers), 5)
520         for wb in writers.values():
521             wb.remote_close()
522
523         leases = list(ss.get_leases("si0"))
524         self.failUnlessEqual(len(leases), 1)
525         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
526
527         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
528                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
529         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
530                                                      sharenums, size, canary)
531         for wb in writers.values():
532             wb.remote_close()
533
534         # take out a second lease on si1
535         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
538                                                      sharenums, size, canary)
539         self.failUnlessEqual(len(already), 5)
540         self.failUnlessEqual(len(writers), 0)
541
542         leases = list(ss.get_leases("si1"))
543         self.failUnlessEqual(len(leases), 2)
544         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
545
546         # and a third lease, using add-lease
547         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
548                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
549         ss.remote_add_lease("si1", rs2a, cs2a)
550         leases = list(ss.get_leases("si1"))
551         self.failUnlessEqual(len(leases), 3)
552         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
553
554         # add-lease on a missing storage index is silently ignored
555         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
556
557         # check that si0 is readable
558         readers = ss.remote_get_buckets("si0")
559         self.failUnlessEqual(len(readers), 5)
560
561         # renew the first lease. Only the proper renew_secret should work
562         ss.remote_renew_lease("si0", rs0)
563         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
564         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
565
566         # check that si0 is still readable
567         readers = ss.remote_get_buckets("si0")
568         self.failUnlessEqual(len(readers), 5)
569
570         # now cancel it
571         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
572         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
573         ss.remote_cancel_lease("si0", cs0)
574
575         # si0 should now be gone
576         readers = ss.remote_get_buckets("si0")
577         self.failUnlessEqual(len(readers), 0)
578         # and the renew should no longer work
579         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
580
581
582         # cancel the first lease on si1, leaving the second and third in place
583         ss.remote_cancel_lease("si1", cs1)
584         readers = ss.remote_get_buckets("si1")
585         self.failUnlessEqual(len(readers), 5)
586         # the corresponding renew should no longer work
587         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
588
589         leases = list(ss.get_leases("si1"))
590         self.failUnlessEqual(len(leases), 2)
591         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
592
593         ss.remote_renew_lease("si1", rs2)
594         # cancelling the second and third should make it go away
595         ss.remote_cancel_lease("si1", cs2)
596         ss.remote_cancel_lease("si1", cs2a)
597         readers = ss.remote_get_buckets("si1")
598         self.failUnlessEqual(len(readers), 0)
599         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
600         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
601         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
602
603         leases = list(ss.get_leases("si1"))
604         self.failUnlessEqual(len(leases), 0)
605
606
607         # test overlapping uploads
608         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
611                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
612         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
613                                                      sharenums, size, canary)
614         self.failUnlessEqual(len(already), 0)
615         self.failUnlessEqual(len(writers), 5)
616         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
617                                                        sharenums, size, canary)
618         self.failUnlessEqual(len(already2), 0)
619         self.failUnlessEqual(len(writers2), 0)
620         for wb in writers.values():
621             wb.remote_close()
622
623         leases = list(ss.get_leases("si3"))
624         self.failUnlessEqual(len(leases), 1)
625
626         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
627                                                        sharenums, size, canary)
628         self.failUnlessEqual(len(already3), 5)
629         self.failUnlessEqual(len(writers3), 0)
630
631         leases = list(ss.get_leases("si3"))
632         self.failUnlessEqual(len(leases), 2)
633
634     def test_readonly(self):
635         workdir = self.workdir("test_readonly")
636         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
637         ss.setServiceParent(self.sparent)
638
639         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
640         self.failUnlessEqual(already, set())
641         self.failUnlessEqual(writers, {})
642
643         stats = ss.get_stats()
644         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
645         if "storage_server.disk_avail" in stats:
646             # Some platforms may not have an API to get disk stats.
647             # But if there are stats, readonly_storage means disk_avail=0
648             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
649
650     def test_discard(self):
651         # discard is really only used for other tests, but we test it anyways
652         workdir = self.workdir("test_discard")
653         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
654         ss.setServiceParent(self.sparent)
655
656         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
657         self.failUnlessEqual(already, set())
658         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
659         for i,wb in writers.items():
660             wb.remote_write(0, "%25d" % i)
661             wb.remote_close()
662         # since we discard the data, the shares should be present but sparse.
663         # Since we write with some seeks, the data we read back will be all
664         # zeros.
665         b = ss.remote_get_buckets("vid")
666         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
667         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
668
669     def test_advise_corruption(self):
670         workdir = self.workdir("test_advise_corruption")
671         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
672         ss.setServiceParent(self.sparent)
673
674         si0_s = base32.b2a("si0")
675         ss.remote_advise_corrupt_share("immutable", "si0", 0,
676                                        "This share smells funny.\n")
677         reportdir = os.path.join(workdir, "corruption-advisories")
678         reports = os.listdir(reportdir)
679         self.failUnlessEqual(len(reports), 1)
680         report_si0 = reports[0]
681         self.failUnlessIn(si0_s, report_si0)
682         f = open(os.path.join(reportdir, report_si0), "r")
683         report = f.read()
684         f.close()
685         self.failUnlessIn("type: immutable", report)
686         self.failUnlessIn("storage_index: %s" % si0_s, report)
687         self.failUnlessIn("share_number: 0", report)
688         self.failUnlessIn("This share smells funny.", report)
689
690         # test the RIBucketWriter version too
691         si1_s = base32.b2a("si1")
692         already,writers = self.allocate(ss, "si1", [1], 75)
693         self.failUnlessEqual(already, set())
694         self.failUnlessEqual(set(writers.keys()), set([1]))
695         writers[1].remote_write(0, "data")
696         writers[1].remote_close()
697
698         b = ss.remote_get_buckets("si1")
699         self.failUnlessEqual(set(b.keys()), set([1]))
700         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
701
702         reports = os.listdir(reportdir)
703         self.failUnlessEqual(len(reports), 2)
704         report_si1 = [r for r in reports if si1_s in r][0]
705         f = open(os.path.join(reportdir, report_si1), "r")
706         report = f.read()
707         f.close()
708         self.failUnlessIn("type: immutable", report)
709         self.failUnlessIn("storage_index: %s" % si1_s, report)
710         self.failUnlessIn("share_number: 1", report)
711         self.failUnlessIn("This share tastes like dust.", report)
712
713
714
715 class MutableServer(unittest.TestCase):
716
717     def setUp(self):
718         self.sparent = LoggingServiceParent()
719         self._lease_secret = itertools.count()
720     def tearDown(self):
721         return self.sparent.stopService()
722
723     def workdir(self, name):
724         basedir = os.path.join("storage", "MutableServer", name)
725         return basedir
726
727     def create(self, name):
728         workdir = self.workdir(name)
729         ss = StorageServer(workdir, "\x00" * 20)
730         ss.setServiceParent(self.sparent)
731         return ss
732
733     def test_create(self):
734         self.create("test_create")
735
736     def write_enabler(self, we_tag):
737         return hashutil.tagged_hash("we_blah", we_tag)
738
739     def renew_secret(self, tag):
740         return hashutil.tagged_hash("renew_blah", str(tag))
741
742     def cancel_secret(self, tag):
743         return hashutil.tagged_hash("cancel_blah", str(tag))
744
745     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
746         write_enabler = self.write_enabler(we_tag)
747         renew_secret = self.renew_secret(lease_tag)
748         cancel_secret = self.cancel_secret(lease_tag)
749         rstaraw = ss.remote_slot_testv_and_readv_and_writev
750         testandwritev = dict( [ (shnum, ([], [], None) )
751                          for shnum in sharenums ] )
752         readv = []
753         rc = rstaraw(storage_index,
754                      (write_enabler, renew_secret, cancel_secret),
755                      testandwritev,
756                      readv)
757         (did_write, readv_data) = rc
758         self.failUnless(did_write)
759         self.failUnless(isinstance(readv_data, dict))
760         self.failUnlessEqual(len(readv_data), 0)
761
762     def test_bad_magic(self):
763         ss = self.create("test_bad_magic")
764         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
765         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
766         f = open(fn, "rb+")
767         f.seek(0)
768         f.write("BAD MAGIC")
769         f.close()
770         read = ss.remote_slot_readv
771         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
772                                   read, "si1", [0], [(0,10)])
773         self.failUnlessIn(" had magic ", str(e))
774         self.failUnlessIn(" but we wanted ", str(e))
775
776     def test_container_size(self):
777         ss = self.create("test_container_size")
778         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
779                       set([0,1,2]), 100)
780         read = ss.remote_slot_readv
781         rstaraw = ss.remote_slot_testv_and_readv_and_writev
782         secrets = ( self.write_enabler("we1"),
783                     self.renew_secret("we1"),
784                     self.cancel_secret("we1") )
785         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
786         answer = rstaraw("si1", secrets,
787                          {0: ([], [(0,data)], len(data)+12)},
788                          [])
789         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
790
791         # trying to make the container too large will raise an exception
792         TOOBIG = MutableShareFile.MAX_SIZE + 10
793         self.failUnlessRaises(DataTooLargeError,
794                               rstaraw, "si1", secrets,
795                               {0: ([], [(0,data)], TOOBIG)},
796                               [])
797
798         # it should be possible to make the container smaller, although at
799         # the moment this doesn't actually affect the share, unless the
800         # container size is dropped to zero, in which case the share is
801         # deleted.
802         answer = rstaraw("si1", secrets,
803                          {0: ([], [(0,data)], len(data)+8)},
804                          [])
805         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
806
807         answer = rstaraw("si1", secrets,
808                          {0: ([], [(0,data)], 0)},
809                          [])
810         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
811
812         read_answer = read("si1", [0], [(0,10)])
813         self.failUnlessEqual(read_answer, {})
814
815     def test_allocate(self):
816         ss = self.create("test_allocate")
817         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
818                       set([0,1,2]), 100)
819
820         read = ss.remote_slot_readv
821         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
822                              {0: [""]})
823         self.failUnlessEqual(read("si1", [], [(0, 10)]),
824                              {0: [""], 1: [""], 2: [""]})
825         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
826                              {0: [""]})
827
828         # try writing to one
829         secrets = ( self.write_enabler("we1"),
830                     self.renew_secret("we1"),
831                     self.cancel_secret("we1") )
832         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
833         write = ss.remote_slot_testv_and_readv_and_writev
834         answer = write("si1", secrets,
835                        {0: ([], [(0,data)], None)},
836                        [])
837         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
838
839         self.failUnlessEqual(read("si1", [0], [(0,20)]),
840                              {0: ["00000000001111111111"]})
841         self.failUnlessEqual(read("si1", [0], [(95,10)]),
842                              {0: ["99999"]})
843         #self.failUnlessEqual(s0.remote_get_length(), 100)
844
845         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
846         f = self.failUnlessRaises(BadWriteEnablerError,
847                                   write, "si1", bad_secrets,
848                                   {}, [])
849         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
850
851         # this testv should fail
852         answer = write("si1", secrets,
853                        {0: ([(0, 12, "eq", "444444444444"),
854                              (20, 5, "eq", "22222"),
855                              ],
856                             [(0, "x"*100)],
857                             None),
858                         },
859                        [(0,12), (20,5)],
860                        )
861         self.failUnlessEqual(answer, (False,
862                                       {0: ["000000000011", "22222"],
863                                        1: ["", ""],
864                                        2: ["", ""],
865                                        }))
866         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
867
868         # as should this one
869         answer = write("si1", secrets,
870                        {0: ([(10, 5, "lt", "11111"),
871                              ],
872                             [(0, "x"*100)],
873                             None),
874                         },
875                        [(10,5)],
876                        )
877         self.failUnlessEqual(answer, (False,
878                                       {0: ["11111"],
879                                        1: [""],
880                                        2: [""]},
881                                       ))
882         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
883
884
885     def test_operators(self):
886         # test operators, the data we're comparing is '11111' in all cases.
887         # test both fail+pass, reset data after each one.
888         ss = self.create("test_operators")
889
890         secrets = ( self.write_enabler("we1"),
891                     self.renew_secret("we1"),
892                     self.cancel_secret("we1") )
893         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
894         write = ss.remote_slot_testv_and_readv_and_writev
895         read = ss.remote_slot_readv
896
897         def reset():
898             write("si1", secrets,
899                   {0: ([], [(0,data)], None)},
900                   [])
901
902         reset()
903
904         #  lt
905         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
906                                              ],
907                                             [(0, "x"*100)],
908                                             None,
909                                             )}, [(10,5)])
910         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
911         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
912         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
913         reset()
914
915         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
916                                              ],
917                                             [(0, "x"*100)],
918                                             None,
919                                             )}, [(10,5)])
920         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
921         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
922         reset()
923
924         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
925                                              ],
926                                             [(0, "y"*100)],
927                                             None,
928                                             )}, [(10,5)])
929         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
930         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
931         reset()
932
933         #  le
934         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
935                                              ],
936                                             [(0, "x"*100)],
937                                             None,
938                                             )}, [(10,5)])
939         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
940         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
941         reset()
942
943         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
944                                              ],
945                                             [(0, "y"*100)],
946                                             None,
947                                             )}, [(10,5)])
948         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
949         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
950         reset()
951
952         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
953                                              ],
954                                             [(0, "y"*100)],
955                                             None,
956                                             )}, [(10,5)])
957         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
959         reset()
960
961         #  eq
962         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
963                                              ],
964                                             [(0, "x"*100)],
965                                             None,
966                                             )}, [(10,5)])
967         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
968         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
969         reset()
970
971         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
972                                              ],
973                                             [(0, "y"*100)],
974                                             None,
975                                             )}, [(10,5)])
976         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
977         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
978         reset()
979
980         #  ne
981         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
982                                              ],
983                                             [(0, "x"*100)],
984                                             None,
985                                             )}, [(10,5)])
986         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
987         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
988         reset()
989
990         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
991                                              ],
992                                             [(0, "y"*100)],
993                                             None,
994                                             )}, [(10,5)])
995         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
996         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
997         reset()
998
999         #  ge
1000         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1001                                              ],
1002                                             [(0, "y"*100)],
1003                                             None,
1004                                             )}, [(10,5)])
1005         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1006         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1007         reset()
1008
1009         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1010                                              ],
1011                                             [(0, "y"*100)],
1012                                             None,
1013                                             )}, [(10,5)])
1014         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1016         reset()
1017
1018         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1019                                              ],
1020                                             [(0, "y"*100)],
1021                                             None,
1022                                             )}, [(10,5)])
1023         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1024         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1025         reset()
1026
1027         #  gt
1028         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1029                                              ],
1030                                             [(0, "y"*100)],
1031                                             None,
1032                                             )}, [(10,5)])
1033         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1034         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1035         reset()
1036
1037         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1038                                              ],
1039                                             [(0, "x"*100)],
1040                                             None,
1041                                             )}, [(10,5)])
1042         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1043         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1044         reset()
1045
1046         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1047                                              ],
1048                                             [(0, "x"*100)],
1049                                             None,
1050                                             )}, [(10,5)])
1051         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1053         reset()
1054
1055         # finally, test some operators against empty shares
1056         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1057                                              ],
1058                                             [(0, "x"*100)],
1059                                             None,
1060                                             )}, [(10,5)])
1061         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1062         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1063         reset()
1064
1065     def test_readv(self):
1066         ss = self.create("test_readv")
1067         secrets = ( self.write_enabler("we1"),
1068                     self.renew_secret("we1"),
1069                     self.cancel_secret("we1") )
1070         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1071         write = ss.remote_slot_testv_and_readv_and_writev
1072         read = ss.remote_slot_readv
1073         data = [("%d" % i) * 100 for i in range(3)]
1074         rc = write("si1", secrets,
1075                    {0: ([], [(0,data[0])], None),
1076                     1: ([], [(0,data[1])], None),
1077                     2: ([], [(0,data[2])], None),
1078                     }, [])
1079         self.failUnlessEqual(rc, (True, {}))
1080
1081         answer = read("si1", [], [(0, 10)])
1082         self.failUnlessEqual(answer, {0: ["0"*10],
1083                                       1: ["1"*10],
1084                                       2: ["2"*10]})
1085
1086     def compare_leases_without_timestamps(self, leases_a, leases_b):
1087         self.failUnlessEqual(len(leases_a), len(leases_b))
1088         for i in range(len(leases_a)):
1089             a = leases_a[i]
1090             b = leases_b[i]
1091             self.failUnlessEqual(a.owner_num,       b.owner_num)
1092             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1093             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1094             self.failUnlessEqual(a.nodeid,          b.nodeid)
1095
1096     def compare_leases(self, leases_a, leases_b):
1097         self.failUnlessEqual(len(leases_a), len(leases_b))
1098         for i in range(len(leases_a)):
1099             a = leases_a[i]
1100             b = leases_b[i]
1101             self.failUnlessEqual(a.owner_num,       b.owner_num)
1102             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1103             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1104             self.failUnlessEqual(a.nodeid,          b.nodeid)
1105             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1106
1107     def test_leases(self):
1108         ss = self.create("test_leases")
1109         def secrets(n):
1110             return ( self.write_enabler("we1"),
1111                      self.renew_secret("we1-%d" % n),
1112                      self.cancel_secret("we1-%d" % n) )
1113         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1114         write = ss.remote_slot_testv_and_readv_and_writev
1115         read = ss.remote_slot_readv
1116         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1117         self.failUnlessEqual(rc, (True, {}))
1118
1119         # create a random non-numeric file in the bucket directory, to
1120         # exercise the code that's supposed to ignore those.
1121         bucket_dir = os.path.join(self.workdir("test_leases"),
1122                                   "shares", storage_index_to_dir("si1"))
1123         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1124         f.write("you ought to be ignoring me\n")
1125         f.close()
1126
1127         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1128         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1129
1130         # add-lease on a missing storage index is silently ignored
1131         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1132
1133         # re-allocate the slots and use the same secrets, that should update
1134         # the lease
1135         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1136         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1137
1138         # renew it directly
1139         ss.remote_renew_lease("si1", secrets(0)[1])
1140         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1141
1142         # now allocate them with a bunch of different secrets, to trigger the
1143         # extended lease code. Use add_lease for one of them.
1144         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1145         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1146         secrets2 = secrets(2)
1147         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1148         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1149         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1150         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1151         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1152
1153         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1154
1155         # cancel one of them
1156         ss.remote_cancel_lease("si1", secrets(5)[2])
1157         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1158
1159         all_leases = list(s0.get_leases())
1160         # and write enough data to expand the container, forcing the server
1161         # to move the leases
1162         write("si1", secrets(0),
1163               {0: ([], [(0,data)], 200), },
1164               [])
1165
1166         # read back the leases, make sure they're still intact.
1167         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1168
1169         ss.remote_renew_lease("si1", secrets(0)[1])
1170         ss.remote_renew_lease("si1", secrets(1)[1])
1171         ss.remote_renew_lease("si1", secrets(2)[1])
1172         ss.remote_renew_lease("si1", secrets(3)[1])
1173         ss.remote_renew_lease("si1", secrets(4)[1])
1174         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1175         # get a new copy of the leases, with the current timestamps. Reading
1176         # data and failing to renew/cancel leases should leave the timestamps
1177         # alone.
1178         all_leases = list(s0.get_leases())
1179         # renewing with a bogus token should prompt an error message
1180
1181         # examine the exception thus raised, make sure the old nodeid is
1182         # present, to provide for share migration
1183         e = self.failUnlessRaises(IndexError,
1184                                   ss.remote_renew_lease, "si1",
1185                                   secrets(20)[1])
1186         e_s = str(e)
1187         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1188         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1189         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1190
1191         # same for cancelling
1192         self.failUnlessRaises(IndexError,
1193                               ss.remote_cancel_lease, "si1",
1194                               secrets(20)[2])
1195         self.compare_leases(all_leases, list(s0.get_leases()))
1196
1197         # reading shares should not modify the timestamp
1198         read("si1", [], [(0,200)])
1199         self.compare_leases(all_leases, list(s0.get_leases()))
1200
1201         write("si1", secrets(0),
1202               {0: ([], [(200, "make me bigger")], None)}, [])
1203         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1204
1205         write("si1", secrets(0),
1206               {0: ([], [(500, "make me really bigger")], None)}, [])
1207         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1208
1209         # now cancel them all
1210         ss.remote_cancel_lease("si1", secrets(0)[2])
1211         ss.remote_cancel_lease("si1", secrets(1)[2])
1212         ss.remote_cancel_lease("si1", secrets(2)[2])
1213         ss.remote_cancel_lease("si1", secrets(3)[2])
1214
1215         # the slot should still be there
1216         remaining_shares = read("si1", [], [(0,10)])
1217         self.failUnlessEqual(len(remaining_shares), 1)
1218         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1219
1220         # cancelling a non-existent lease should raise an IndexError
1221         self.failUnlessRaises(IndexError,
1222                               ss.remote_cancel_lease, "si1", "nonsecret")
1223
1224         # and the slot should still be there
1225         remaining_shares = read("si1", [], [(0,10)])
1226         self.failUnlessEqual(len(remaining_shares), 1)
1227         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1228
1229         ss.remote_cancel_lease("si1", secrets(4)[2])
1230         # now the slot should be gone
1231         no_shares = read("si1", [], [(0,10)])
1232         self.failUnlessEqual(no_shares, {})
1233
1234         # cancelling a lease on a non-existent share should raise an IndexError
1235         self.failUnlessRaises(IndexError,
1236                               ss.remote_cancel_lease, "si2", "nonsecret")
1237
1238     def test_remove(self):
1239         ss = self.create("test_remove")
1240         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1241                       set([0,1,2]), 100)
1242         readv = ss.remote_slot_readv
1243         writev = ss.remote_slot_testv_and_readv_and_writev
1244         secrets = ( self.write_enabler("we1"),
1245                     self.renew_secret("we1"),
1246                     self.cancel_secret("we1") )
1247         # delete sh0 by setting its size to zero
1248         answer = writev("si1", secrets,
1249                         {0: ([], [], 0)},
1250                         [])
1251         # the answer should mention all the shares that existed before the
1252         # write
1253         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1254         # but a new read should show only sh1 and sh2
1255         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1256                              {1: [""], 2: [""]})
1257
1258         # delete sh1 by setting its size to zero
1259         answer = writev("si1", secrets,
1260                         {1: ([], [], 0)},
1261                         [])
1262         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1263         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1264                              {2: [""]})
1265
1266         # delete sh2 by setting its size to zero
1267         answer = writev("si1", secrets,
1268                         {2: ([], [], 0)},
1269                         [])
1270         self.failUnlessEqual(answer, (True, {2:[]}) )
1271         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1272                              {})
1273         # and the bucket directory should now be gone
1274         si = base32.b2a("si1")
1275         # note: this is a detail of the storage server implementation, and
1276         # may change in the future
1277         prefix = si[:2]
1278         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1279         bucketdir = os.path.join(prefixdir, si)
1280         self.failUnless(os.path.exists(prefixdir), prefixdir)
1281         self.failIf(os.path.exists(bucketdir), bucketdir)
1282
1283 class Stats(unittest.TestCase):
1284
1285     def setUp(self):
1286         self.sparent = LoggingServiceParent()
1287         self._lease_secret = itertools.count()
1288     def tearDown(self):
1289         return self.sparent.stopService()
1290
1291     def workdir(self, name):
1292         basedir = os.path.join("storage", "Server", name)
1293         return basedir
1294
1295     def create(self, name):
1296         workdir = self.workdir(name)
1297         ss = StorageServer(workdir, "\x00" * 20)
1298         ss.setServiceParent(self.sparent)
1299         return ss
1300
1301     def test_latencies(self):
1302         ss = self.create("test_latencies")
1303         for i in range(10000):
1304             ss.add_latency("allocate", 1.0 * i)
1305         for i in range(1000):
1306             ss.add_latency("renew", 1.0 * i)
1307         for i in range(10):
1308             ss.add_latency("cancel", 2.0 * i)
1309         ss.add_latency("get", 5.0)
1310
1311         output = ss.get_latencies()
1312
1313         self.failUnlessEqual(sorted(output.keys()),
1314                              sorted(["allocate", "renew", "cancel", "get"]))
1315         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1316         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1317         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1318         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1319         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1320         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1321         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1322         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1323         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1324
1325         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1326         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1327         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
1328         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1329         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1330         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1331         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1332         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1333         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1334
1335         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1336         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1337         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1, output)
1338         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
1339         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1340         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1341         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1342         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1343         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1344
1345         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1346         self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1347         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1348         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1349         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1350         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1351         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1352         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1353         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1354
1355 def remove_tags(s):
1356     s = re.sub(r'<[^>]*>', ' ', s)
1357     s = re.sub(r'\s+', ' ', s)
1358     return s
1359
1360 class MyBucketCountingCrawler(BucketCountingCrawler):
1361     def finished_prefix(self, cycle, prefix):
1362         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1363         if self.hook_ds:
1364             d = self.hook_ds.pop(0)
1365             d.callback(None)
1366
1367 class MyStorageServer(StorageServer):
1368     def add_bucket_counter(self):
1369         statefile = os.path.join(self.storedir, "bucket_counter.state")
1370         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1371         self.bucket_counter.setServiceParent(self)
1372
1373 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1374
1375     def setUp(self):
1376         self.s = service.MultiService()
1377         self.s.startService()
1378     def tearDown(self):
1379         return self.s.stopService()
1380
1381     def test_bucket_counter(self):
1382         basedir = "storage/BucketCounter/bucket_counter"
1383         fileutil.make_dirs(basedir)
1384         ss = StorageServer(basedir, "\x00" * 20)
1385         # to make sure we capture the bucket-counting-crawler in the middle
1386         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1387         # also make it start sooner than usual.
1388         ss.bucket_counter.slow_start = 0
1389         orig_cpu_slice = ss.bucket_counter.cpu_slice
1390         ss.bucket_counter.cpu_slice = 0
1391         ss.setServiceParent(self.s)
1392
1393         w = StorageStatus(ss)
1394
1395         # this sample is before the crawler has started doing anything
1396         html = w.renderSynchronously()
1397         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1398         s = remove_tags(html)
1399         self.failUnlessIn("Accepting new shares: Yes", s)
1400         self.failUnlessIn("Reserved space: - 0 B (0)", s)
1401         self.failUnlessIn("Total buckets: Not computed yet", s)
1402         self.failUnlessIn("Next crawl in", s)
1403
1404         # give the bucket-counting-crawler one tick to get started. The
1405         # cpu_slice=0 will force it to yield right after it processes the
1406         # first prefix
1407
1408         d = fireEventually()
1409         def _check(ignored):
1410             # are we really right after the first prefix?
1411             state = ss.bucket_counter.get_state()
1412             if state["last-complete-prefix"] is None:
1413                 d2 = fireEventually()
1414                 d2.addCallback(_check)
1415                 return d2
1416             self.failUnlessEqual(state["last-complete-prefix"],
1417                                  ss.bucket_counter.prefixes[0])
1418             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1419             html = w.renderSynchronously()
1420             s = remove_tags(html)
1421             self.failUnlessIn(" Current crawl ", s)
1422             self.failUnlessIn(" (next work in ", s)
1423         d.addCallback(_check)
1424
1425         # now give it enough time to complete a full cycle
1426         def _watch():
1427             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1428         d.addCallback(lambda ignored: self.poll(_watch))
1429         def _check2(ignored):
1430             ss.bucket_counter.cpu_slice = orig_cpu_slice
1431             html = w.renderSynchronously()
1432             s = remove_tags(html)
1433             self.failUnlessIn("Total buckets: 0 (the number of", s)
1434             self.failUnlessIn("Next crawl in 59 minutes", s)
1435         d.addCallback(_check2)
1436         return d
1437
1438     def test_bucket_counter_cleanup(self):
1439         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1440         fileutil.make_dirs(basedir)
1441         ss = StorageServer(basedir, "\x00" * 20)
1442         # to make sure we capture the bucket-counting-crawler in the middle
1443         # of a cycle, we reach in and reduce its maximum slice time to 0.
1444         ss.bucket_counter.slow_start = 0
1445         orig_cpu_slice = ss.bucket_counter.cpu_slice
1446         ss.bucket_counter.cpu_slice = 0
1447         ss.setServiceParent(self.s)
1448
1449         d = fireEventually()
1450
1451         def _after_first_prefix(ignored):
1452             state = ss.bucket_counter.state
1453             if state["last-complete-prefix"] is None:
1454                 d2 = fireEventually()
1455                 d2.addCallback(_after_first_prefix)
1456                 return d2
1457             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1458             # now sneak in and mess with its state, to make sure it cleans up
1459             # properly at the end of the cycle
1460             self.failUnlessEqual(state["last-complete-prefix"],
1461                                  ss.bucket_counter.prefixes[0])
1462             state["bucket-counts"][-12] = {}
1463             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1464             ss.bucket_counter.save_state()
1465         d.addCallback(_after_first_prefix)
1466
1467         # now give it enough time to complete a cycle
1468         def _watch():
1469             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1470         d.addCallback(lambda ignored: self.poll(_watch))
1471         def _check2(ignored):
1472             ss.bucket_counter.cpu_slice = orig_cpu_slice
1473             s = ss.bucket_counter.get_state()
1474             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1475             self.failIf("bogusprefix!" in s["storage-index-samples"],
1476                         s["storage-index-samples"].keys())
1477         d.addCallback(_check2)
1478         return d
1479
1480     def test_bucket_counter_eta(self):
1481         basedir = "storage/BucketCounter/bucket_counter_eta"
1482         fileutil.make_dirs(basedir)
1483         ss = MyStorageServer(basedir, "\x00" * 20)
1484         ss.bucket_counter.slow_start = 0
1485         # these will be fired inside finished_prefix()
1486         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1487         w = StorageStatus(ss)
1488
1489         d = defer.Deferred()
1490
1491         def _check_1(ignored):
1492             # no ETA is available yet
1493             html = w.renderSynchronously()
1494             s = remove_tags(html)
1495             self.failUnlessIn("complete (next work", s)
1496
1497         def _check_2(ignored):
1498             # one prefix has finished, so an ETA based upon that elapsed time
1499             # should be available.
1500             html = w.renderSynchronously()
1501             s = remove_tags(html)
1502             self.failUnlessIn("complete (ETA ", s)
1503
1504         def _check_3(ignored):
1505             # two prefixes have finished
1506             html = w.renderSynchronously()
1507             s = remove_tags(html)
1508             self.failUnlessIn("complete (ETA ", s)
1509             d.callback("done")
1510
1511         hooks[0].addCallback(_check_1).addErrback(d.errback)
1512         hooks[1].addCallback(_check_2).addErrback(d.errback)
1513         hooks[2].addCallback(_check_3).addErrback(d.errback)
1514
1515         ss.setServiceParent(self.s)
1516         return d
1517
1518 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1519     stop_after_first_bucket = False
1520     def process_bucket(self, *args, **kwargs):
1521         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1522         if self.stop_after_first_bucket:
1523             self.stop_after_first_bucket = False
1524             self.cpu_slice = -1.0
1525     def yielding(self, sleep_time):
1526         if not self.stop_after_first_bucket:
1527             self.cpu_slice = 500
1528
1529 class BrokenStatResults:
1530     pass
1531 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1532     def stat(self, fn):
1533         s = os.stat(fn)
1534         bsr = BrokenStatResults()
1535         for attrname in dir(s):
1536             if attrname.startswith("_"):
1537                 continue
1538             if attrname == "st_blocks":
1539                 continue
1540             setattr(bsr, attrname, getattr(s, attrname))
1541         return bsr
1542
1543 class InstrumentedStorageServer(StorageServer):
1544     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1545 class No_ST_BLOCKS_StorageServer(StorageServer):
1546     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1547
1548 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1549
1550     def setUp(self):
1551         self.s = service.MultiService()
1552         self.s.startService()
1553     def tearDown(self):
1554         return self.s.stopService()
1555
1556     def make_shares(self, ss):
1557         def make(si):
1558             return (si, hashutil.tagged_hash("renew", si),
1559                     hashutil.tagged_hash("cancel", si))
1560         def make_mutable(si):
1561             return (si, hashutil.tagged_hash("renew", si),
1562                     hashutil.tagged_hash("cancel", si),
1563                     hashutil.tagged_hash("write-enabler", si))
1564         def make_extra_lease(si, num):
1565             return (hashutil.tagged_hash("renew-%d" % num, si),
1566                     hashutil.tagged_hash("cancel-%d" % num, si))
1567
1568         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1569         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1570         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1571         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1572         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1573         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1574         sharenums = [0]
1575         canary = FakeCanary()
1576         # note: 'tahoe debug dump-share' will not handle this file, since the
1577         # inner contents are not a valid CHK share
1578         data = "\xff" * 1000
1579
1580         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1581                                          1000, canary)
1582         w[0].remote_write(0, data)
1583         w[0].remote_close()
1584
1585         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1586                                          1000, canary)
1587         w[0].remote_write(0, data)
1588         w[0].remote_close()
1589         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1590
1591         writev = ss.remote_slot_testv_and_readv_and_writev
1592         writev(mutable_si_2, (we2, rs2, cs2),
1593                {0: ([], [(0,data)], len(data))}, [])
1594         writev(mutable_si_3, (we3, rs3, cs3),
1595                {0: ([], [(0,data)], len(data))}, [])
1596         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1597
1598         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1599         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1600         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1601
1602     def test_basic(self):
1603         basedir = "storage/LeaseCrawler/basic"
1604         fileutil.make_dirs(basedir)
1605         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1606         # make it start sooner than usual.
1607         lc = ss.lease_checker
1608         lc.slow_start = 0
1609         lc.cpu_slice = 500
1610         lc.stop_after_first_bucket = True
1611         webstatus = StorageStatus(ss)
1612
1613         # create a few shares, with some leases on them
1614         self.make_shares(ss)
1615         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1616
1617         # add a non-sharefile to exercise another code path
1618         fn = os.path.join(ss.sharedir,
1619                           storage_index_to_dir(immutable_si_0),
1620                           "not-a-share")
1621         f = open(fn, "wb")
1622         f.write("I am not a share.\n")
1623         f.close()
1624
1625         # this is before the crawl has started, so we're not in a cycle yet
1626         initial_state = lc.get_state()
1627         self.failIf(lc.get_progress()["cycle-in-progress"])
1628         self.failIfIn("cycle-to-date", initial_state)
1629         self.failIfIn("estimated-remaining-cycle", initial_state)
1630         self.failIfIn("estimated-current-cycle", initial_state)
1631         self.failUnlessIn("history", initial_state)
1632         self.failUnlessEqual(initial_state["history"], {})
1633
1634         ss.setServiceParent(self.s)
1635
1636         DAY = 24*60*60
1637
1638         d = fireEventually()
1639
1640         # now examine the state right after the first bucket has been
1641         # processed.
1642         def _after_first_bucket(ignored):
1643             initial_state = lc.get_state()
1644             if "cycle-to-date" not in initial_state:
1645                 d2 = fireEventually()
1646                 d2.addCallback(_after_first_bucket)
1647                 return d2
1648             self.failUnlessIn("cycle-to-date", initial_state)
1649             self.failUnlessIn("estimated-remaining-cycle", initial_state)
1650             self.failUnlessIn("estimated-current-cycle", initial_state)
1651             self.failUnlessIn("history", initial_state)
1652             self.failUnlessEqual(initial_state["history"], {})
1653
1654             so_far = initial_state["cycle-to-date"]
1655             self.failUnlessEqual(so_far["expiration-enabled"], False)
1656             self.failUnlessIn("configured-expiration-mode", so_far)
1657             self.failUnlessIn("lease-age-histogram", so_far)
1658             lah = so_far["lease-age-histogram"]
1659             self.failUnlessEqual(type(lah), list)
1660             self.failUnlessEqual(len(lah), 1)
1661             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1662             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1663             self.failUnlessEqual(so_far["corrupt-shares"], [])
1664             sr1 = so_far["space-recovered"]
1665             self.failUnlessEqual(sr1["examined-buckets"], 1)
1666             self.failUnlessEqual(sr1["examined-shares"], 1)
1667             self.failUnlessEqual(sr1["actual-shares"], 0)
1668             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1669             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1670             left = initial_state["estimated-remaining-cycle"]
1671             sr2 = left["space-recovered"]
1672             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1673             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1674             self.failIfEqual(sr2["actual-shares"], None)
1675             self.failIfEqual(sr2["configured-diskbytes"], None)
1676             self.failIfEqual(sr2["original-sharebytes"], None)
1677         d.addCallback(_after_first_bucket)
1678         d.addCallback(lambda ign: self.render1(webstatus))
1679         def _check_html_in_cycle(html):
1680             s = remove_tags(html)
1681             self.failUnlessIn("So far, this cycle has examined "
1682                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1683             self.failUnlessIn("and has recovered: "
1684                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1685                               "0 B (0 B / 0 B)", s)
1686             self.failUnlessIn("If expiration were enabled, "
1687                               "we would have recovered: "
1688                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1689                               " 0 B (0 B / 0 B) by now", s)
1690             self.failUnlessIn("and the remainder of this cycle "
1691                               "would probably recover: "
1692                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1693                               " 0 B (0 B / 0 B)", s)
1694             self.failUnlessIn("and the whole cycle would probably recover: "
1695                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1696                               " 0 B (0 B / 0 B)", s)
1697             self.failUnlessIn("if we were strictly using each lease's default "
1698                               "31-day lease lifetime", s)
1699             self.failUnlessIn("this cycle would be expected to recover: ", s)
1700         d.addCallback(_check_html_in_cycle)
1701
1702         # wait for the crawler to finish the first cycle. Nothing should have
1703         # been removed.
1704         def _wait():
1705             return bool(lc.get_state()["last-cycle-finished"] is not None)
1706         d.addCallback(lambda ign: self.poll(_wait))
1707
1708         def _after_first_cycle(ignored):
1709             s = lc.get_state()
1710             self.failIf("cycle-to-date" in s)
1711             self.failIf("estimated-remaining-cycle" in s)
1712             self.failIf("estimated-current-cycle" in s)
1713             last = s["history"][0]
1714             self.failUnlessIn("cycle-start-finish-times", last)
1715             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1716             self.failUnlessEqual(last["expiration-enabled"], False)
1717             self.failUnlessIn("configured-expiration-mode", last)
1718
1719             self.failUnlessIn("lease-age-histogram", last)
1720             lah = last["lease-age-histogram"]
1721             self.failUnlessEqual(type(lah), list)
1722             self.failUnlessEqual(len(lah), 1)
1723             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1724
1725             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1726             self.failUnlessEqual(last["corrupt-shares"], [])
1727
1728             rec = last["space-recovered"]
1729             self.failUnlessEqual(rec["examined-buckets"], 4)
1730             self.failUnlessEqual(rec["examined-shares"], 4)
1731             self.failUnlessEqual(rec["actual-buckets"], 0)
1732             self.failUnlessEqual(rec["original-buckets"], 0)
1733             self.failUnlessEqual(rec["configured-buckets"], 0)
1734             self.failUnlessEqual(rec["actual-shares"], 0)
1735             self.failUnlessEqual(rec["original-shares"], 0)
1736             self.failUnlessEqual(rec["configured-shares"], 0)
1737             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1738             self.failUnlessEqual(rec["original-diskbytes"], 0)
1739             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1740             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1741             self.failUnlessEqual(rec["original-sharebytes"], 0)
1742             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1743
1744             def _get_sharefile(si):
1745                 return list(ss._iter_share_files(si))[0]
1746             def count_leases(si):
1747                 return len(list(_get_sharefile(si).get_leases()))
1748             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1749             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1750             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1751             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1752         d.addCallback(_after_first_cycle)
1753         d.addCallback(lambda ign: self.render1(webstatus))
1754         def _check_html(html):
1755             s = remove_tags(html)
1756             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1757                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1758             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1759                               "(2 mutable / 2 immutable),", s)
1760             self.failUnlessIn("but expiration was not enabled", s)
1761         d.addCallback(_check_html)
1762         d.addCallback(lambda ign: self.render_json(webstatus))
1763         def _check_json(json):
1764             data = simplejson.loads(json)
1765             self.failUnlessIn("lease-checker", data)
1766             self.failUnlessIn("lease-checker-progress", data)
1767         d.addCallback(_check_json)
1768         return d
1769
1770     def backdate_lease(self, sf, renew_secret, new_expire_time):
1771         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1772         # "renew" a lease with a new_expire_time that is older than what the
1773         # current lease has), so we have to reach inside it.
1774         for i,lease in enumerate(sf.get_leases()):
1775             if lease.renew_secret == renew_secret:
1776                 lease.expiration_time = new_expire_time
1777                 f = open(sf.home, 'rb+')
1778                 sf._write_lease_record(f, i, lease)
1779                 f.close()
1780                 return
1781         raise IndexError("unable to renew non-existent lease")
1782
1783     def test_expire_age(self):
1784         basedir = "storage/LeaseCrawler/expire_age"
1785         fileutil.make_dirs(basedir)
1786         # setting expiration_time to 2000 means that any lease which is more
1787         # than 2000s old will be expired.
1788         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1789                                        expiration_enabled=True,
1790                                        expiration_mode="age",
1791                                        expiration_override_lease_duration=2000)
1792         # make it start sooner than usual.
1793         lc = ss.lease_checker
1794         lc.slow_start = 0
1795         lc.stop_after_first_bucket = True
1796         webstatus = StorageStatus(ss)
1797
1798         # create a few shares, with some leases on them
1799         self.make_shares(ss)
1800         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1801
1802         def count_shares(si):
1803             return len(list(ss._iter_share_files(si)))
1804         def _get_sharefile(si):
1805             return list(ss._iter_share_files(si))[0]
1806         def count_leases(si):
1807             return len(list(_get_sharefile(si).get_leases()))
1808
1809         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1810         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1811         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1812         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1813         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1814         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1815         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1816         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1817
1818         # artificially crank back the expiration time on the first lease of
1819         # each share, to make it look like it expired already (age=1000s).
1820         # Some shares have an extra lease which is set to expire at the
1821         # default time in 31 days from now (age=31days). We then run the
1822         # crawler, which will expire the first lease, making some shares get
1823         # deleted and others stay alive (with one remaining lease)
1824         now = time.time()
1825
1826         sf0 = _get_sharefile(immutable_si_0)
1827         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1828         sf0_size = os.stat(sf0.home).st_size
1829
1830         # immutable_si_1 gets an extra lease
1831         sf1 = _get_sharefile(immutable_si_1)
1832         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1833
1834         sf2 = _get_sharefile(mutable_si_2)
1835         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1836         sf2_size = os.stat(sf2.home).st_size
1837
1838         # mutable_si_3 gets an extra lease
1839         sf3 = _get_sharefile(mutable_si_3)
1840         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1841
1842         ss.setServiceParent(self.s)
1843
1844         d = fireEventually()
1845         # examine the state right after the first bucket has been processed
1846         def _after_first_bucket(ignored):
1847             p = lc.get_progress()
1848             if not p["cycle-in-progress"]:
1849                 d2 = fireEventually()
1850                 d2.addCallback(_after_first_bucket)
1851                 return d2
1852         d.addCallback(_after_first_bucket)
1853         d.addCallback(lambda ign: self.render1(webstatus))
1854         def _check_html_in_cycle(html):
1855             s = remove_tags(html)
1856             # the first bucket encountered gets deleted, and its prefix
1857             # happens to be about 1/5th of the way through the ring, so the
1858             # predictor thinks we'll have 5 shares and that we'll delete them
1859             # all. This part of the test depends upon the SIs landing right
1860             # where they do now.
1861             self.failUnlessIn("The remainder of this cycle is expected to "
1862                               "recover: 4 shares, 4 buckets", s)
1863             self.failUnlessIn("The whole cycle is expected to examine "
1864                               "5 shares in 5 buckets and to recover: "
1865                               "5 shares, 5 buckets", s)
1866         d.addCallback(_check_html_in_cycle)
1867
1868         # wait for the crawler to finish the first cycle. Two shares should
1869         # have been removed
1870         def _wait():
1871             return bool(lc.get_state()["last-cycle-finished"] is not None)
1872         d.addCallback(lambda ign: self.poll(_wait))
1873
1874         def _after_first_cycle(ignored):
1875             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1876             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1877             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1878             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1879             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1880             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1881
1882             s = lc.get_state()
1883             last = s["history"][0]
1884
1885             self.failUnlessEqual(last["expiration-enabled"], True)
1886             self.failUnlessEqual(last["configured-expiration-mode"],
1887                                  ("age", 2000, None, ("mutable", "immutable")))
1888             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1889
1890             rec = last["space-recovered"]
1891             self.failUnlessEqual(rec["examined-buckets"], 4)
1892             self.failUnlessEqual(rec["examined-shares"], 4)
1893             self.failUnlessEqual(rec["actual-buckets"], 2)
1894             self.failUnlessEqual(rec["original-buckets"], 2)
1895             self.failUnlessEqual(rec["configured-buckets"], 2)
1896             self.failUnlessEqual(rec["actual-shares"], 2)
1897             self.failUnlessEqual(rec["original-shares"], 2)
1898             self.failUnlessEqual(rec["configured-shares"], 2)
1899             size = sf0_size + sf2_size
1900             self.failUnlessEqual(rec["actual-sharebytes"], size)
1901             self.failUnlessEqual(rec["original-sharebytes"], size)
1902             self.failUnlessEqual(rec["configured-sharebytes"], size)
1903             # different platforms have different notions of "blocks used by
1904             # this file", so merely assert that it's a number
1905             self.failUnless(rec["actual-diskbytes"] >= 0,
1906                             rec["actual-diskbytes"])
1907             self.failUnless(rec["original-diskbytes"] >= 0,
1908                             rec["original-diskbytes"])
1909             self.failUnless(rec["configured-diskbytes"] >= 0,
1910                             rec["configured-diskbytes"])
1911         d.addCallback(_after_first_cycle)
1912         d.addCallback(lambda ign: self.render1(webstatus))
1913         def _check_html(html):
1914             s = remove_tags(html)
1915             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1916             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1917             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1918         d.addCallback(_check_html)
1919         return d
1920
1921     def test_expire_cutoff_date(self):
1922         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1923         fileutil.make_dirs(basedir)
1924         # setting cutoff-date to 2000 seconds ago means that any lease which
1925         # is more than 2000s old will be expired.
1926         now = time.time()
1927         then = int(now - 2000)
1928         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1929                                        expiration_enabled=True,
1930                                        expiration_mode="cutoff-date",
1931                                        expiration_cutoff_date=then)
1932         # make it start sooner than usual.
1933         lc = ss.lease_checker
1934         lc.slow_start = 0
1935         lc.stop_after_first_bucket = True
1936         webstatus = StorageStatus(ss)
1937
1938         # create a few shares, with some leases on them
1939         self.make_shares(ss)
1940         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1941
1942         def count_shares(si):
1943             return len(list(ss._iter_share_files(si)))
1944         def _get_sharefile(si):
1945             return list(ss._iter_share_files(si))[0]
1946         def count_leases(si):
1947             return len(list(_get_sharefile(si).get_leases()))
1948
1949         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1950         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1951         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1952         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1953         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1954         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1955         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1956         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1957
1958         # artificially crank back the expiration time on the first lease of
1959         # each share, to make it look like was renewed 3000s ago. To achieve
1960         # this, we need to set the expiration time to now-3000+31days. This
1961         # will change when the lease format is improved to contain both
1962         # create/renew time and duration.
1963         new_expiration_time = now - 3000 + 31*24*60*60
1964
1965         # Some shares have an extra lease which is set to expire at the
1966         # default time in 31 days from now (age=31days). We then run the
1967         # crawler, which will expire the first lease, making some shares get
1968         # deleted and others stay alive (with one remaining lease)
1969
1970         sf0 = _get_sharefile(immutable_si_0)
1971         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1972         sf0_size = os.stat(sf0.home).st_size
1973
1974         # immutable_si_1 gets an extra lease
1975         sf1 = _get_sharefile(immutable_si_1)
1976         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1977
1978         sf2 = _get_sharefile(mutable_si_2)
1979         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1980         sf2_size = os.stat(sf2.home).st_size
1981
1982         # mutable_si_3 gets an extra lease
1983         sf3 = _get_sharefile(mutable_si_3)
1984         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1985
1986         ss.setServiceParent(self.s)
1987
1988         d = fireEventually()
1989         # examine the state right after the first bucket has been processed
1990         def _after_first_bucket(ignored):
1991             p = lc.get_progress()
1992             if not p["cycle-in-progress"]:
1993                 d2 = fireEventually()
1994                 d2.addCallback(_after_first_bucket)
1995                 return d2
1996         d.addCallback(_after_first_bucket)
1997         d.addCallback(lambda ign: self.render1(webstatus))
1998         def _check_html_in_cycle(html):
1999             s = remove_tags(html)
2000             # the first bucket encountered gets deleted, and its prefix
2001             # happens to be about 1/5th of the way through the ring, so the
2002             # predictor thinks we'll have 5 shares and that we'll delete them
2003             # all. This part of the test depends upon the SIs landing right
2004             # where they do now.
2005             self.failUnlessIn("The remainder of this cycle is expected to "
2006                               "recover: 4 shares, 4 buckets", s)
2007             self.failUnlessIn("The whole cycle is expected to examine "
2008                               "5 shares in 5 buckets and to recover: "
2009                               "5 shares, 5 buckets", s)
2010         d.addCallback(_check_html_in_cycle)
2011
2012         # wait for the crawler to finish the first cycle. Two shares should
2013         # have been removed
2014         def _wait():
2015             return bool(lc.get_state()["last-cycle-finished"] is not None)
2016         d.addCallback(lambda ign: self.poll(_wait))
2017
2018         def _after_first_cycle(ignored):
2019             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2020             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2021             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2022             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2023             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2024             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2025
2026             s = lc.get_state()
2027             last = s["history"][0]
2028
2029             self.failUnlessEqual(last["expiration-enabled"], True)
2030             self.failUnlessEqual(last["configured-expiration-mode"],
2031                                  ("cutoff-date", None, then,
2032                                   ("mutable", "immutable")))
2033             self.failUnlessEqual(last["leases-per-share-histogram"],
2034                                  {1: 2, 2: 2})
2035
2036             rec = last["space-recovered"]
2037             self.failUnlessEqual(rec["examined-buckets"], 4)
2038             self.failUnlessEqual(rec["examined-shares"], 4)
2039             self.failUnlessEqual(rec["actual-buckets"], 2)
2040             self.failUnlessEqual(rec["original-buckets"], 0)
2041             self.failUnlessEqual(rec["configured-buckets"], 2)
2042             self.failUnlessEqual(rec["actual-shares"], 2)
2043             self.failUnlessEqual(rec["original-shares"], 0)
2044             self.failUnlessEqual(rec["configured-shares"], 2)
2045             size = sf0_size + sf2_size
2046             self.failUnlessEqual(rec["actual-sharebytes"], size)
2047             self.failUnlessEqual(rec["original-sharebytes"], 0)
2048             self.failUnlessEqual(rec["configured-sharebytes"], size)
2049             # different platforms have different notions of "blocks used by
2050             # this file", so merely assert that it's a number
2051             self.failUnless(rec["actual-diskbytes"] >= 0,
2052                             rec["actual-diskbytes"])
2053             self.failUnless(rec["original-diskbytes"] >= 0,
2054                             rec["original-diskbytes"])
2055             self.failUnless(rec["configured-diskbytes"] >= 0,
2056                             rec["configured-diskbytes"])
2057         d.addCallback(_after_first_cycle)
2058         d.addCallback(lambda ign: self.render1(webstatus))
2059         def _check_html(html):
2060             s = remove_tags(html)
2061             self.failUnlessIn("Expiration Enabled:"
2062                               " expired leases will be removed", s)
2063             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2064             substr = "Leases created or last renewed before %s will be considered expired." % date
2065             self.failUnlessIn(substr, s)
2066             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2067         d.addCallback(_check_html)
2068         return d
2069
2070     def test_only_immutable(self):
2071         basedir = "storage/LeaseCrawler/only_immutable"
2072         fileutil.make_dirs(basedir)
2073         now = time.time()
2074         then = int(now - 2000)
2075         ss = StorageServer(basedir, "\x00" * 20,
2076                            expiration_enabled=True,
2077                            expiration_mode="cutoff-date",
2078                            expiration_cutoff_date=then,
2079                            expiration_sharetypes=("immutable",))
2080         lc = ss.lease_checker
2081         lc.slow_start = 0
2082         webstatus = StorageStatus(ss)
2083
2084         self.make_shares(ss)
2085         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2086         # set all leases to be expirable
2087         new_expiration_time = now - 3000 + 31*24*60*60
2088
2089         def count_shares(si):
2090             return len(list(ss._iter_share_files(si)))
2091         def _get_sharefile(si):
2092             return list(ss._iter_share_files(si))[0]
2093         def count_leases(si):
2094             return len(list(_get_sharefile(si).get_leases()))
2095
2096         sf0 = _get_sharefile(immutable_si_0)
2097         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2098         sf1 = _get_sharefile(immutable_si_1)
2099         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2100         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2101         sf2 = _get_sharefile(mutable_si_2)
2102         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2103         sf3 = _get_sharefile(mutable_si_3)
2104         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2105         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2106
2107         ss.setServiceParent(self.s)
2108         def _wait():
2109             return bool(lc.get_state()["last-cycle-finished"] is not None)
2110         d = self.poll(_wait)
2111
2112         def _after_first_cycle(ignored):
2113             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2114             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2115             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2116             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2117             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2118             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2119         d.addCallback(_after_first_cycle)
2120         d.addCallback(lambda ign: self.render1(webstatus))
2121         def _check_html(html):
2122             s = remove_tags(html)
2123             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2124         d.addCallback(_check_html)
2125         return d
2126
2127     def test_only_mutable(self):
2128         basedir = "storage/LeaseCrawler/only_mutable"
2129         fileutil.make_dirs(basedir)
2130         now = time.time()
2131         then = int(now - 2000)
2132         ss = StorageServer(basedir, "\x00" * 20,
2133                            expiration_enabled=True,
2134                            expiration_mode="cutoff-date",
2135                            expiration_cutoff_date=then,
2136                            expiration_sharetypes=("mutable",))
2137         lc = ss.lease_checker
2138         lc.slow_start = 0
2139         webstatus = StorageStatus(ss)
2140
2141         self.make_shares(ss)
2142         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2143         # set all leases to be expirable
2144         new_expiration_time = now - 3000 + 31*24*60*60
2145
2146         def count_shares(si):
2147             return len(list(ss._iter_share_files(si)))
2148         def _get_sharefile(si):
2149             return list(ss._iter_share_files(si))[0]
2150         def count_leases(si):
2151             return len(list(_get_sharefile(si).get_leases()))
2152
2153         sf0 = _get_sharefile(immutable_si_0)
2154         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2155         sf1 = _get_sharefile(immutable_si_1)
2156         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2157         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2158         sf2 = _get_sharefile(mutable_si_2)
2159         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2160         sf3 = _get_sharefile(mutable_si_3)
2161         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2162         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2163
2164         ss.setServiceParent(self.s)
2165         def _wait():
2166             return bool(lc.get_state()["last-cycle-finished"] is not None)
2167         d = self.poll(_wait)
2168
2169         def _after_first_cycle(ignored):
2170             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2171             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2172             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2173             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2174             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2175             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2176         d.addCallback(_after_first_cycle)
2177         d.addCallback(lambda ign: self.render1(webstatus))
2178         def _check_html(html):
2179             s = remove_tags(html)
2180             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2181         d.addCallback(_check_html)
2182         return d
2183
2184     def test_bad_mode(self):
2185         basedir = "storage/LeaseCrawler/bad_mode"
2186         fileutil.make_dirs(basedir)
2187         e = self.failUnlessRaises(ValueError,
2188                                   StorageServer, basedir, "\x00" * 20,
2189                                   expiration_mode="bogus")
2190         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2191
2192     def test_parse_duration(self):
2193         DAY = 24*60*60
2194         MONTH = 31*DAY
2195         YEAR = 365*DAY
2196         p = time_format.parse_duration
2197         self.failUnlessEqual(p("7days"), 7*DAY)
2198         self.failUnlessEqual(p("31day"), 31*DAY)
2199         self.failUnlessEqual(p("60 days"), 60*DAY)
2200         self.failUnlessEqual(p("2mo"), 2*MONTH)
2201         self.failUnlessEqual(p("3 month"), 3*MONTH)
2202         self.failUnlessEqual(p("2years"), 2*YEAR)
2203         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2204         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2205
2206     def test_parse_date(self):
2207         p = time_format.parse_date
2208         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2209         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2210
2211     def test_limited_history(self):
2212         basedir = "storage/LeaseCrawler/limited_history"
2213         fileutil.make_dirs(basedir)
2214         ss = StorageServer(basedir, "\x00" * 20)
2215         # make it start sooner than usual.
2216         lc = ss.lease_checker
2217         lc.slow_start = 0
2218         lc.cpu_slice = 500
2219
2220         # create a few shares, with some leases on them
2221         self.make_shares(ss)
2222
2223         ss.setServiceParent(self.s)
2224
2225         def _wait_until_15_cycles_done():
2226             last = lc.state["last-cycle-finished"]
2227             if last is not None and last >= 15:
2228                 return True
2229             if lc.timer:
2230                 lc.timer.reset(0)
2231             return False
2232         d = self.poll(_wait_until_15_cycles_done)
2233
2234         def _check(ignored):
2235             s = lc.get_state()
2236             h = s["history"]
2237             self.failUnlessEqual(len(h), 10)
2238             self.failUnlessEqual(max(h.keys()), 15)
2239             self.failUnlessEqual(min(h.keys()), 6)
2240         d.addCallback(_check)
2241         return d
2242
2243     def test_unpredictable_future(self):
2244         basedir = "storage/LeaseCrawler/unpredictable_future"
2245         fileutil.make_dirs(basedir)
2246         ss = StorageServer(basedir, "\x00" * 20)
2247         # make it start sooner than usual.
2248         lc = ss.lease_checker
2249         lc.slow_start = 0
2250         lc.cpu_slice = -1.0 # stop quickly
2251
2252         self.make_shares(ss)
2253
2254         ss.setServiceParent(self.s)
2255
2256         d = fireEventually()
2257         def _check(ignored):
2258             # this should fire after the first bucket is complete, but before
2259             # the first prefix is complete, so the progress-measurer won't
2260             # think we've gotten far enough to raise our percent-complete
2261             # above 0%, triggering the cannot-predict-the-future code in
2262             # expirer.py . This will have to change if/when the
2263             # progress-measurer gets smart enough to count buckets (we'll
2264             # have to interrupt it even earlier, before it's finished the
2265             # first bucket).
2266             s = lc.get_state()
2267             if "cycle-to-date" not in s:
2268                 d2 = fireEventually()
2269                 d2.addCallback(_check)
2270                 return d2
2271             self.failUnlessIn("cycle-to-date", s)
2272             self.failUnlessIn("estimated-remaining-cycle", s)
2273             self.failUnlessIn("estimated-current-cycle", s)
2274
2275             left = s["estimated-remaining-cycle"]["space-recovered"]
2276             self.failUnlessEqual(left["actual-buckets"], None)
2277             self.failUnlessEqual(left["original-buckets"], None)
2278             self.failUnlessEqual(left["configured-buckets"], None)
2279             self.failUnlessEqual(left["actual-shares"], None)
2280             self.failUnlessEqual(left["original-shares"], None)
2281             self.failUnlessEqual(left["configured-shares"], None)
2282             self.failUnlessEqual(left["actual-diskbytes"], None)
2283             self.failUnlessEqual(left["original-diskbytes"], None)
2284             self.failUnlessEqual(left["configured-diskbytes"], None)
2285             self.failUnlessEqual(left["actual-sharebytes"], None)
2286             self.failUnlessEqual(left["original-sharebytes"], None)
2287             self.failUnlessEqual(left["configured-sharebytes"], None)
2288
2289             full = s["estimated-remaining-cycle"]["space-recovered"]
2290             self.failUnlessEqual(full["actual-buckets"], None)
2291             self.failUnlessEqual(full["original-buckets"], None)
2292             self.failUnlessEqual(full["configured-buckets"], None)
2293             self.failUnlessEqual(full["actual-shares"], None)
2294             self.failUnlessEqual(full["original-shares"], None)
2295             self.failUnlessEqual(full["configured-shares"], None)
2296             self.failUnlessEqual(full["actual-diskbytes"], None)
2297             self.failUnlessEqual(full["original-diskbytes"], None)
2298             self.failUnlessEqual(full["configured-diskbytes"], None)
2299             self.failUnlessEqual(full["actual-sharebytes"], None)
2300             self.failUnlessEqual(full["original-sharebytes"], None)
2301             self.failUnlessEqual(full["configured-sharebytes"], None)
2302
2303         d.addCallback(_check)
2304         return d
2305
2306     def test_no_st_blocks(self):
2307         basedir = "storage/LeaseCrawler/no_st_blocks"
2308         fileutil.make_dirs(basedir)
2309         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2310                                         expiration_mode="age",
2311                                         expiration_override_lease_duration=-1000)
2312         # a negative expiration_time= means the "configured-"
2313         # space-recovered counts will be non-zero, since all shares will have
2314         # expired by then
2315
2316         # make it start sooner than usual.
2317         lc = ss.lease_checker
2318         lc.slow_start = 0
2319
2320         self.make_shares(ss)
2321         ss.setServiceParent(self.s)
2322         def _wait():
2323             return bool(lc.get_state()["last-cycle-finished"] is not None)
2324         d = self.poll(_wait)
2325
2326         def _check(ignored):
2327             s = lc.get_state()
2328             last = s["history"][0]
2329             rec = last["space-recovered"]
2330             self.failUnlessEqual(rec["configured-buckets"], 4)
2331             self.failUnlessEqual(rec["configured-shares"], 4)
2332             self.failUnless(rec["configured-sharebytes"] > 0,
2333                             rec["configured-sharebytes"])
2334             # without the .st_blocks field in os.stat() results, we should be
2335             # reporting diskbytes==sharebytes
2336             self.failUnlessEqual(rec["configured-sharebytes"],
2337                                  rec["configured-diskbytes"])
2338         d.addCallback(_check)
2339         return d
2340
2341     def test_share_corruption(self):
2342         self._poll_should_ignore_these_errors = [
2343             UnknownMutableContainerVersionError,
2344             UnknownImmutableContainerVersionError,
2345             ]
2346         basedir = "storage/LeaseCrawler/share_corruption"
2347         fileutil.make_dirs(basedir)
2348         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2349         w = StorageStatus(ss)
2350         # make it start sooner than usual.
2351         lc = ss.lease_checker
2352         lc.stop_after_first_bucket = True
2353         lc.slow_start = 0
2354         lc.cpu_slice = 500
2355
2356         # create a few shares, with some leases on them
2357         self.make_shares(ss)
2358
2359         # now corrupt one, and make sure the lease-checker keeps going
2360         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2361         first = min(self.sis)
2362         first_b32 = base32.b2a(first)
2363         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2364         f = open(fn, "rb+")
2365         f.seek(0)
2366         f.write("BAD MAGIC")
2367         f.close()
2368         # if get_share_file() doesn't see the correct mutable magic, it
2369         # assumes the file is an immutable share, and then
2370         # immutable.ShareFile sees a bad version. So regardless of which kind
2371         # of share we corrupted, this will trigger an
2372         # UnknownImmutableContainerVersionError.
2373
2374         # also create an empty bucket
2375         empty_si = base32.b2a("\x04"*16)
2376         empty_bucket_dir = os.path.join(ss.sharedir,
2377                                         storage_index_to_dir(empty_si))
2378         fileutil.make_dirs(empty_bucket_dir)
2379
2380         ss.setServiceParent(self.s)
2381
2382         d = fireEventually()
2383
2384         # now examine the state right after the first bucket has been
2385         # processed.
2386         def _after_first_bucket(ignored):
2387             s = lc.get_state()
2388             if "cycle-to-date" not in s:
2389                 d2 = fireEventually()
2390                 d2.addCallback(_after_first_bucket)
2391                 return d2
2392             so_far = s["cycle-to-date"]
2393             rec = so_far["space-recovered"]
2394             self.failUnlessEqual(rec["examined-buckets"], 1)
2395             self.failUnlessEqual(rec["examined-shares"], 0)
2396             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2397         d.addCallback(_after_first_bucket)
2398
2399         d.addCallback(lambda ign: self.render_json(w))
2400         def _check_json(json):
2401             data = simplejson.loads(json)
2402             # grr. json turns all dict keys into strings.
2403             so_far = data["lease-checker"]["cycle-to-date"]
2404             corrupt_shares = so_far["corrupt-shares"]
2405             # it also turns all tuples into lists
2406             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2407         d.addCallback(_check_json)
2408         d.addCallback(lambda ign: self.render1(w))
2409         def _check_html(html):
2410             s = remove_tags(html)
2411             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2412         d.addCallback(_check_html)
2413
2414         def _wait():
2415             return bool(lc.get_state()["last-cycle-finished"] is not None)
2416         d.addCallback(lambda ign: self.poll(_wait))
2417
2418         def _after_first_cycle(ignored):
2419             s = lc.get_state()
2420             last = s["history"][0]
2421             rec = last["space-recovered"]
2422             self.failUnlessEqual(rec["examined-buckets"], 5)
2423             self.failUnlessEqual(rec["examined-shares"], 3)
2424             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2425         d.addCallback(_after_first_cycle)
2426         d.addCallback(lambda ign: self.render_json(w))
2427         def _check_json_history(json):
2428             data = simplejson.loads(json)
2429             last = data["lease-checker"]["history"]["0"]
2430             corrupt_shares = last["corrupt-shares"]
2431             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2432         d.addCallback(_check_json_history)
2433         d.addCallback(lambda ign: self.render1(w))
2434         def _check_html_history(html):
2435             s = remove_tags(html)
2436             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2437         d.addCallback(_check_html_history)
2438
2439         def _cleanup(res):
2440             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2441                                    UnknownImmutableContainerVersionError)
2442             return res
2443         d.addBoth(_cleanup)
2444         return d
2445
2446     def render_json(self, page):
2447         d = self.render1(page, args={"t": ["json"]})
2448         return d
2449
2450 class NoDiskStatsServer(StorageServer):
2451     def get_disk_stats(self):
2452         raise AttributeError
2453
2454 class BadDiskStatsServer(StorageServer):
2455     def get_disk_stats(self):
2456         raise OSError
2457
2458 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2459
2460     def setUp(self):
2461         self.s = service.MultiService()
2462         self.s.startService()
2463     def tearDown(self):
2464         return self.s.stopService()
2465
2466     def test_no_server(self):
2467         w = StorageStatus(None)
2468         html = w.renderSynchronously()
2469         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2470
2471     def test_status(self):
2472         basedir = "storage/WebStatus/status"
2473         fileutil.make_dirs(basedir)
2474         ss = StorageServer(basedir, "\x00" * 20)
2475         ss.setServiceParent(self.s)
2476         w = StorageStatus(ss)
2477         d = self.render1(w)
2478         def _check_html(html):
2479             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2480             s = remove_tags(html)
2481             self.failUnlessIn("Accepting new shares: Yes", s)
2482             self.failUnlessIn("Reserved space: - 0 B (0)", s)
2483         d.addCallback(_check_html)
2484         d.addCallback(lambda ign: self.render_json(w))
2485         def _check_json(json):
2486             data = simplejson.loads(json)
2487             s = data["stats"]
2488             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2489             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2490             self.failUnlessIn("bucket-counter", data)
2491             self.failUnlessIn("lease-checker", data)
2492         d.addCallback(_check_json)
2493         return d
2494
2495     def render_json(self, page):
2496         d = self.render1(page, args={"t": ["json"]})
2497         return d
2498
2499     def test_status_no_disk_stats(self):
2500         # Some platforms may have no disk stats API. Make sure the code can handle that
2501         # (test runs on all platforms).
2502         basedir = "storage/WebStatus/status_no_disk_stats"
2503         fileutil.make_dirs(basedir)
2504         ss = NoDiskStatsServer(basedir, "\x00" * 20)
2505         ss.setServiceParent(self.s)
2506         w = StorageStatus(ss)
2507         html = w.renderSynchronously()
2508         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2509         s = remove_tags(html)
2510         self.failUnlessIn("Accepting new shares: Yes", s)
2511         self.failUnlessIn("Total disk space: ?", s)
2512         self.failUnlessIn("Space Available to Tahoe: ?", s)
2513         self.failUnless(ss.get_available_space() is None)
2514
2515     def test_status_bad_disk_stats(self):
2516         # If the API to get disk stats exists but a call to it fails, then the status should
2517         # show that no shares will be accepted, and get_available_space() should be 0.
2518         basedir = "storage/WebStatus/status_bad_disk_stats"
2519         fileutil.make_dirs(basedir)
2520         ss = BadDiskStatsServer(basedir, "\x00" * 20)
2521         ss.setServiceParent(self.s)
2522         w = StorageStatus(ss)
2523         html = w.renderSynchronously()
2524         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2525         s = remove_tags(html)
2526         self.failUnlessIn("Accepting new shares: No", s)
2527         self.failUnlessIn("Total disk space: ?", s)
2528         self.failUnlessIn("Space Available to Tahoe: ?", s)
2529         self.failUnlessEqual(ss.get_available_space(), 0)
2530
2531     def test_readonly(self):
2532         basedir = "storage/WebStatus/readonly"
2533         fileutil.make_dirs(basedir)
2534         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2535         ss.setServiceParent(self.s)
2536         w = StorageStatus(ss)
2537         html = w.renderSynchronously()
2538         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2539         s = remove_tags(html)
2540         self.failUnlessIn("Accepting new shares: No", s)
2541
2542     def test_reserved(self):
2543         basedir = "storage/WebStatus/reserved"
2544         fileutil.make_dirs(basedir)
2545         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2546         ss.setServiceParent(self.s)
2547         w = StorageStatus(ss)
2548         html = w.renderSynchronously()
2549         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2550         s = remove_tags(html)
2551         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2552
2553     def test_huge_reserved(self):
2554         basedir = "storage/WebStatus/reserved"
2555         fileutil.make_dirs(basedir)
2556         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2557         ss.setServiceParent(self.s)
2558         w = StorageStatus(ss)
2559         html = w.renderSynchronously()
2560         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2561         s = remove_tags(html)
2562         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2563
2564     def test_util(self):
2565         w = StorageStatus(None)
2566         self.failUnlessEqual(w.render_space(None, None), "?")
2567         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2568         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2569         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2570         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2571         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
2572