]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage server: detect disk space usage on Windows too (fixes #637)
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re, simplejson, struct
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
26
27 class Marker:
28     pass
29 class FakeCanary:
30     def __init__(self, ignore_disconnectors=False):
31         self.ignore = ignore_disconnectors
32         self.disconnectors = {}
33     def notifyOnDisconnect(self, f, *args, **kwargs):
34         if self.ignore:
35             return
36         m = Marker()
37         self.disconnectors[m] = (f, args, kwargs)
38         return m
39     def dontNotifyOnDisconnect(self, marker):
40         if self.ignore:
41             return
42         del self.disconnectors[marker]
43
44 class FakeStatsProvider:
45     def count(self, name, delta=1):
46         pass
47     def register_producer(self, producer):
48         pass
49
50 class Bucket(unittest.TestCase):
51     def make_workdir(self, name):
52         basedir = os.path.join("storage", "Bucket", name)
53         incoming = os.path.join(basedir, "tmp", "bucket")
54         final = os.path.join(basedir, "bucket")
55         fileutil.make_dirs(basedir)
56         fileutil.make_dirs(os.path.join(basedir, "tmp"))
57         return incoming, final
58
59     def bucket_writer_closed(self, bw, consumed):
60         pass
61     def add_latency(self, category, latency):
62         pass
63     def count(self, name, delta=1):
64         pass
65
66     def make_lease(self):
67         owner_num = 0
68         renew_secret = os.urandom(32)
69         cancel_secret = os.urandom(32)
70         expiration_time = time.time() + 5000
71         return LeaseInfo(owner_num, renew_secret, cancel_secret,
72                          expiration_time, "\x00" * 20)
73
74     def test_create(self):
75         incoming, final = self.make_workdir("test_create")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*25)
81         bw.remote_write(75, "d"*7)
82         bw.remote_close()
83
84     def test_readwrite(self):
85         incoming, final = self.make_workdir("test_readwrite")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*7) # last block may be short
91         bw.remote_close()
92
93         # now read from it
94         br = BucketReader(self, bw.finalhome)
95         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
98
99 class RemoteBucket:
100
101     def callRemote(self, methname, *args, **kwargs):
102         def _call():
103             meth = getattr(self.target, "remote_" + methname)
104             return meth(*args, **kwargs)
105         return defer.maybeDeferred(_call)
106
107 class BucketProxy(unittest.TestCase):
108     def make_bucket(self, name, size):
109         basedir = os.path.join("storage", "BucketProxy", name)
110         incoming = os.path.join(basedir, "tmp", "bucket")
111         final = os.path.join(basedir, "bucket")
112         fileutil.make_dirs(basedir)
113         fileutil.make_dirs(os.path.join(basedir, "tmp"))
114         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
115                           FakeCanary())
116         rb = RemoteBucket()
117         rb.target = bw
118         return bw, rb, final
119
120     def make_lease(self):
121         owner_num = 0
122         renew_secret = os.urandom(32)
123         cancel_secret = os.urandom(32)
124         expiration_time = time.time() + 5000
125         return LeaseInfo(owner_num, renew_secret, cancel_secret,
126                          expiration_time, "\x00" * 20)
127
128     def bucket_writer_closed(self, bw, consumed):
129         pass
130     def add_latency(self, category, latency):
131         pass
132     def count(self, name, delta=1):
133         pass
134
135     def test_create(self):
136         bw, rb, sharefname = self.make_bucket("test_create", 500)
137         bp = WriteBucketProxy(rb,
138                               data_size=300,
139                               block_size=10,
140                               num_segments=5,
141                               num_share_hashes=3,
142                               uri_extension_size_max=500, nodeid=None)
143         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
144
145     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146         # Let's pretend each share has 100 bytes of data, and that there are
147         # 4 segments (25 bytes each), and 8 shares total. So the two
148         # per-segment merkle trees (crypttext_hash_tree,
149         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152         # long. That should make the whole share:
153         #
154         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
156
157         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
158
159         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
160                             for i in range(7)]
161         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
162                         for i in range(7)]
163         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
164                         for i in (1,9,13)]
165         uri_extension = "s" + "E"*498 + "e"
166
167         bw, rb, sharefname = self.make_bucket(name, sharesize)
168         bp = wbp_class(rb,
169                        data_size=95,
170                        block_size=25,
171                        num_segments=4,
172                        num_share_hashes=3,
173                        uri_extension_size_max=len(uri_extension),
174                        nodeid=None)
175
176         d = bp.put_header()
177         d.addCallback(lambda res: bp.put_block(0, "a"*25))
178         d.addCallback(lambda res: bp.put_block(1, "b"*25))
179         d.addCallback(lambda res: bp.put_block(2, "c"*25))
180         d.addCallback(lambda res: bp.put_block(3, "d"*20))
181         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185         d.addCallback(lambda res: bp.close())
186
187         # now read everything back
188         def _start_reading(res):
189             br = BucketReader(self, sharefname)
190             rb = RemoteBucket()
191             rb.target = br
192             rbp = rbp_class(rb, peerid="abc", storage_index="")
193             self.failUnless("to peer" in repr(rbp))
194             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
195
196             d1 = rbp.get_block_data(0, 25, 25)
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
204
205             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, crypttext_hashes))
208             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210             d1.addCallback(lambda res: rbp.get_share_hashes())
211             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212             d1.addCallback(lambda res: rbp.get_uri_extension())
213             d1.addCallback(lambda res:
214                            self.failUnlessEqual(res, uri_extension))
215
216             return d1
217
218         d.addCallback(_start_reading)
219
220         return d
221
222     def test_readwrite_v1(self):
223         return self._do_test_readwrite("test_readwrite_v1",
224                                        0x24, WriteBucketProxy, ReadBucketProxy)
225
226     def test_readwrite_v2(self):
227         return self._do_test_readwrite("test_readwrite_v2",
228                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
229
230 class FakeDiskStorageServer(StorageServer):
231     DISKAVAIL = 0
232     def get_disk_stats(self):
233         return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
234
235 class Server(unittest.TestCase):
236
237     def setUp(self):
238         self.sparent = LoggingServiceParent()
239         self.sparent.startService()
240         self._lease_secret = itertools.count()
241     def tearDown(self):
242         return self.sparent.stopService()
243
244     def workdir(self, name):
245         basedir = os.path.join("storage", "Server", name)
246         return basedir
247
248     def create(self, name, reserved_space=0, klass=StorageServer):
249         workdir = self.workdir(name)
250         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251                    stats_provider=FakeStatsProvider())
252         ss.setServiceParent(self.sparent)
253         return ss
254
255     def test_create(self):
256         ss = self.create("test_create")
257
258     def allocate(self, ss, storage_index, sharenums, size, canary=None):
259         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
261         if not canary:
262             canary = FakeCanary()
263         return ss.remote_allocate_buckets(storage_index,
264                                           renew_secret, cancel_secret,
265                                           sharenums, size, canary)
266
267     def test_large_share(self):
268         ss = self.create("test_large_share")
269
270         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271         self.failUnlessEqual(already, set())
272         self.failUnlessEqual(set(writers.keys()), set([0]))
273
274         shnum, bucket = writers.items()[0]
275         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
276         bucket.remote_write(2**32, "ab")
277         bucket.remote_close()
278
279         readers = ss.remote_get_buckets("allocate")
280         reader = readers[shnum]
281         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
283
284     def test_dont_overfill_dirs(self):
285         """
286         This test asserts that if you add a second share whose storage index
287         share lots of leading bits with an extant share (but isn't the exact
288         same storage index), this won't add an entry to the share directory.
289         """
290         ss = self.create("test_dont_overfill_dirs")
291         already, writers = self.allocate(ss, "storageindex", [0], 10)
292         for i, wb in writers.items():
293             wb.remote_write(0, "%10d" % i)
294             wb.remote_close()
295         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296                                 "shares")
297         children_of_storedir = set(os.listdir(storedir))
298
299         # Now store another one under another storageindex that has leading
300         # chars the same as the first storageindex.
301         already, writers = self.allocate(ss, "storageindey", [0], 10)
302         for i, wb in writers.items():
303             wb.remote_write(0, "%10d" % i)
304             wb.remote_close()
305         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
306                                 "shares")
307         new_children_of_storedir = set(os.listdir(storedir))
308         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
309
310     def test_remove_incoming(self):
311         ss = self.create("test_remove_incoming")
312         already, writers = self.allocate(ss, "vid", range(3), 10)
313         for i,wb in writers.items():
314             wb.remote_write(0, "%10d" % i)
315             wb.remote_close()
316         incoming_share_dir = wb.incominghome
317         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319         incoming_dir = os.path.dirname(incoming_prefix_dir)
320         self.failIf(os.path.exists(incoming_bucket_dir))
321         self.failIf(os.path.exists(incoming_prefix_dir))
322         self.failUnless(os.path.exists(incoming_dir))
323
324     def test_allocate(self):
325         ss = self.create("test_allocate")
326
327         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
328
329         canary = FakeCanary()
330         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
331         self.failUnlessEqual(already, set())
332         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
333
334         # while the buckets are open, they should not count as readable
335         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
336
337         # close the buckets
338         for i,wb in writers.items():
339             wb.remote_write(0, "%25d" % i)
340             wb.remote_close()
341             # aborting a bucket that was already closed is a no-op
342             wb.remote_abort()
343
344         # now they should be readable
345         b = ss.remote_get_buckets("allocate")
346         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
347         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
348         b_str = str(b[0])
349         self.failUnless("BucketReader" in b_str, b_str)
350         self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
351
352         # now if we ask about writing again, the server should offer those
353         # three buckets as already present. It should offer them even if we
354         # don't ask about those specific ones.
355         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
356         self.failUnlessEqual(already, set([0,1,2]))
357         self.failUnlessEqual(set(writers.keys()), set([3,4]))
358
359         # while those two buckets are open for writing, the server should
360         # refuse to offer them to uploaders
361
362         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
363         self.failUnlessEqual(already2, set([0,1,2]))
364         self.failUnlessEqual(set(writers2.keys()), set([5]))
365
366         # aborting the writes should remove the tempfiles
367         for i,wb in writers2.items():
368             wb.remote_abort()
369         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
370         self.failUnlessEqual(already2, set([0,1,2]))
371         self.failUnlessEqual(set(writers2.keys()), set([5]))
372
373         for i,wb in writers2.items():
374             wb.remote_abort()
375         for i,wb in writers.items():
376             wb.remote_abort()
377
378     def test_bad_container_version(self):
379         ss = self.create("test_bad_container_version")
380         a,w = self.allocate(ss, "si1", [0], 10)
381         w[0].remote_write(0, "\xff"*10)
382         w[0].remote_close()
383
384         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
385         f = open(fn, "rb+")
386         f.seek(0)
387         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
388         f.close()
389
390         b = ss.remote_get_buckets("allocate")
391
392         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
393                                   ss.remote_get_buckets, "si1")
394         self.failUnless(" had version 0 but we wanted 1" in str(e), e)
395
396     def test_disconnect(self):
397         # simulate a disconnection
398         ss = self.create("test_disconnect")
399         canary = FakeCanary()
400         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
401         self.failUnlessEqual(already, set())
402         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
403         for (f,args,kwargs) in canary.disconnectors.values():
404             f(*args, **kwargs)
405         del already
406         del writers
407
408         # that ought to delete the incoming shares
409         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
410         self.failUnlessEqual(already, set())
411         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
412
413     def test_reserved_space(self):
414         ss = self.create("test_reserved_space", reserved_space=10000,
415                          klass=FakeDiskStorageServer)
416         # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
417         ss.DISKAVAIL = 15000
418         # 15k available, 10k reserved, leaves 5k for shares
419
420         # a newly created and filled share incurs this much overhead, beyond
421         # the size we request.
422         OVERHEAD = 3*4
423         LEASE_SIZE = 4+32+32+4
424         canary = FakeCanary(True)
425         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
426         self.failUnlessEqual(len(writers), 3)
427         # now the StorageServer should have 3000 bytes provisionally
428         # allocated, allowing only 2000 more to be claimed
429         self.failUnlessEqual(len(ss._active_writers), 3)
430
431         # allocating 1001-byte shares only leaves room for one
432         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
433         self.failUnlessEqual(len(writers2), 1)
434         self.failUnlessEqual(len(ss._active_writers), 4)
435
436         # we abandon the first set, so their provisional allocation should be
437         # returned
438         del already
439         del writers
440         self.failUnlessEqual(len(ss._active_writers), 1)
441         # now we have a provisional allocation of 1001 bytes
442
443         # and we close the second set, so their provisional allocation should
444         # become real, long-term allocation, and grows to include the
445         # overhead.
446         for bw in writers2.values():
447             bw.remote_write(0, "a"*25)
448             bw.remote_close()
449         del already2
450         del writers2
451         del bw
452         self.failUnlessEqual(len(ss._active_writers), 0)
453
454         allocated = 1001 + OVERHEAD + LEASE_SIZE
455
456         # we have to manually increase DISKAVAIL, since we're not doing real
457         # disk measurements
458         ss.DISKAVAIL -= allocated
459
460         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
461         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
462         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
463         self.failUnlessEqual(len(writers3), 39)
464         self.failUnlessEqual(len(ss._active_writers), 39)
465
466         del already3
467         del writers3
468         self.failUnlessEqual(len(ss._active_writers), 0)
469         ss.disownServiceParent()
470         del ss
471
472     def test_disk_stats(self):
473         # This will spuriously fail if there is zero disk space left (but so will other tests).
474         ss = self.create("test_disk_stats", reserved_space=0)
475
476         disk = ss.get_disk_stats()
477         self.failUnless(disk['total'] > 0, disk['total'])
478         self.failUnless(disk['used'] > 0, disk['used'])
479         self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
480         self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
481         self.failUnless(disk['avail'] > 0, disk['avail'])
482
483     def test_disk_stats_avail_nonnegative(self):
484         ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
485
486         disk = ss.get_disk_stats()
487         self.failUnlessEqual(disk['avail'], 0)
488
489     def test_seek(self):
490         basedir = self.workdir("test_seek_behavior")
491         fileutil.make_dirs(basedir)
492         filename = os.path.join(basedir, "testfile")
493         f = open(filename, "wb")
494         f.write("start")
495         f.close()
496         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
497         # files. mode="a" preserves previous contents but does not allow
498         # seeking-to-create-holes. mode="r+" allows both.
499         f = open(filename, "rb+")
500         f.seek(100)
501         f.write("100")
502         f.close()
503         filelen = os.stat(filename)[stat.ST_SIZE]
504         self.failUnlessEqual(filelen, 100+3)
505         f2 = open(filename, "rb")
506         self.failUnlessEqual(f2.read(5), "start")
507
508
509     def test_leases(self):
510         ss = self.create("test_leases")
511         canary = FakeCanary()
512         sharenums = range(5)
513         size = 100
514
515         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
516                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
517         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
518                                                      sharenums, size, canary)
519         self.failUnlessEqual(len(already), 0)
520         self.failUnlessEqual(len(writers), 5)
521         for wb in writers.values():
522             wb.remote_close()
523
524         leases = list(ss.get_leases("si0"))
525         self.failUnlessEqual(len(leases), 1)
526         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
527
528         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
529                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
530         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
531                                                      sharenums, size, canary)
532         for wb in writers.values():
533             wb.remote_close()
534
535         # take out a second lease on si1
536         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
537                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
538         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
539                                                      sharenums, size, canary)
540         self.failUnlessEqual(len(already), 5)
541         self.failUnlessEqual(len(writers), 0)
542
543         leases = list(ss.get_leases("si1"))
544         self.failUnlessEqual(len(leases), 2)
545         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
546
547         # and a third lease, using add-lease
548         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
549                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
550         ss.remote_add_lease("si1", rs2a, cs2a)
551         leases = list(ss.get_leases("si1"))
552         self.failUnlessEqual(len(leases), 3)
553         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
554
555         # add-lease on a missing storage index is silently ignored
556         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
557
558         # check that si0 is readable
559         readers = ss.remote_get_buckets("si0")
560         self.failUnlessEqual(len(readers), 5)
561
562         # renew the first lease. Only the proper renew_secret should work
563         ss.remote_renew_lease("si0", rs0)
564         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
565         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
566
567         # check that si0 is still readable
568         readers = ss.remote_get_buckets("si0")
569         self.failUnlessEqual(len(readers), 5)
570
571         # now cancel it
572         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
573         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
574         ss.remote_cancel_lease("si0", cs0)
575
576         # si0 should now be gone
577         readers = ss.remote_get_buckets("si0")
578         self.failUnlessEqual(len(readers), 0)
579         # and the renew should no longer work
580         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
581
582
583         # cancel the first lease on si1, leaving the second and third in place
584         ss.remote_cancel_lease("si1", cs1)
585         readers = ss.remote_get_buckets("si1")
586         self.failUnlessEqual(len(readers), 5)
587         # the corresponding renew should no longer work
588         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
589
590         leases = list(ss.get_leases("si1"))
591         self.failUnlessEqual(len(leases), 2)
592         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
593
594         ss.remote_renew_lease("si1", rs2)
595         # cancelling the second and third should make it go away
596         ss.remote_cancel_lease("si1", cs2)
597         ss.remote_cancel_lease("si1", cs2a)
598         readers = ss.remote_get_buckets("si1")
599         self.failUnlessEqual(len(readers), 0)
600         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
601         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
602         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
603
604         leases = list(ss.get_leases("si1"))
605         self.failUnlessEqual(len(leases), 0)
606
607
608         # test overlapping uploads
609         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
610                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
611         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
612                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
613         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
614                                                      sharenums, size, canary)
615         self.failUnlessEqual(len(already), 0)
616         self.failUnlessEqual(len(writers), 5)
617         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
618                                                        sharenums, size, canary)
619         self.failUnlessEqual(len(already2), 0)
620         self.failUnlessEqual(len(writers2), 0)
621         for wb in writers.values():
622             wb.remote_close()
623
624         leases = list(ss.get_leases("si3"))
625         self.failUnlessEqual(len(leases), 1)
626
627         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
628                                                        sharenums, size, canary)
629         self.failUnlessEqual(len(already3), 5)
630         self.failUnlessEqual(len(writers3), 0)
631
632         leases = list(ss.get_leases("si3"))
633         self.failUnlessEqual(len(leases), 2)
634
635     def test_readonly(self):
636         workdir = self.workdir("test_readonly")
637         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
638         ss.setServiceParent(self.sparent)
639
640         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
641         self.failUnlessEqual(already, set())
642         self.failUnlessEqual(writers, {})
643
644         stats = ss.get_stats()
645         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
646         if "storage_server.disk_avail" in stats:
647             # Some platforms may not have an API to get disk stats.
648             # But if there are stats, readonly_storage means disk_avail=0
649             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
650
651     def test_discard(self):
652         # discard is really only used for other tests, but we test it anyways
653         workdir = self.workdir("test_discard")
654         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
655         ss.setServiceParent(self.sparent)
656
657         canary = FakeCanary()
658         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
659         self.failUnlessEqual(already, set())
660         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
661         for i,wb in writers.items():
662             wb.remote_write(0, "%25d" % i)
663             wb.remote_close()
664         # since we discard the data, the shares should be present but sparse.
665         # Since we write with some seeks, the data we read back will be all
666         # zeros.
667         b = ss.remote_get_buckets("vid")
668         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
669         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
670
671     def test_advise_corruption(self):
672         workdir = self.workdir("test_advise_corruption")
673         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
674         ss.setServiceParent(self.sparent)
675
676         si0_s = base32.b2a("si0")
677         ss.remote_advise_corrupt_share("immutable", "si0", 0,
678                                        "This share smells funny.\n")
679         reportdir = os.path.join(workdir, "corruption-advisories")
680         reports = os.listdir(reportdir)
681         self.failUnlessEqual(len(reports), 1)
682         report_si0 = reports[0]
683         self.failUnless(si0_s in report_si0, report_si0)
684         f = open(os.path.join(reportdir, report_si0), "r")
685         report = f.read()
686         f.close()
687         self.failUnless("type: immutable" in report)
688         self.failUnless(("storage_index: %s" % si0_s) in report)
689         self.failUnless("share_number: 0" in report)
690         self.failUnless("This share smells funny." in report)
691
692         # test the RIBucketWriter version too
693         si1_s = base32.b2a("si1")
694         already,writers = self.allocate(ss, "si1", [1], 75)
695         self.failUnlessEqual(already, set())
696         self.failUnlessEqual(set(writers.keys()), set([1]))
697         writers[1].remote_write(0, "data")
698         writers[1].remote_close()
699
700         b = ss.remote_get_buckets("si1")
701         self.failUnlessEqual(set(b.keys()), set([1]))
702         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
703
704         reports = os.listdir(reportdir)
705         self.failUnlessEqual(len(reports), 2)
706         report_si1 = [r for r in reports if si1_s in r][0]
707         f = open(os.path.join(reportdir, report_si1), "r")
708         report = f.read()
709         f.close()
710         self.failUnless("type: immutable" in report)
711         self.failUnless(("storage_index: %s" % si1_s) in report)
712         self.failUnless("share_number: 1" in report)
713         self.failUnless("This share tastes like dust." in report)
714
715
716
717 class MutableServer(unittest.TestCase):
718
719     def setUp(self):
720         self.sparent = LoggingServiceParent()
721         self._lease_secret = itertools.count()
722     def tearDown(self):
723         return self.sparent.stopService()
724
725     def workdir(self, name):
726         basedir = os.path.join("storage", "MutableServer", name)
727         return basedir
728
729     def create(self, name):
730         workdir = self.workdir(name)
731         ss = StorageServer(workdir, "\x00" * 20)
732         ss.setServiceParent(self.sparent)
733         return ss
734
735     def test_create(self):
736         ss = self.create("test_create")
737
738     def write_enabler(self, we_tag):
739         return hashutil.tagged_hash("we_blah", we_tag)
740
741     def renew_secret(self, tag):
742         return hashutil.tagged_hash("renew_blah", str(tag))
743
744     def cancel_secret(self, tag):
745         return hashutil.tagged_hash("cancel_blah", str(tag))
746
747     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
748         write_enabler = self.write_enabler(we_tag)
749         renew_secret = self.renew_secret(lease_tag)
750         cancel_secret = self.cancel_secret(lease_tag)
751         rstaraw = ss.remote_slot_testv_and_readv_and_writev
752         testandwritev = dict( [ (shnum, ([], [], None) )
753                          for shnum in sharenums ] )
754         readv = []
755         rc = rstaraw(storage_index,
756                      (write_enabler, renew_secret, cancel_secret),
757                      testandwritev,
758                      readv)
759         (did_write, readv_data) = rc
760         self.failUnless(did_write)
761         self.failUnless(isinstance(readv_data, dict))
762         self.failUnlessEqual(len(readv_data), 0)
763
764     def test_bad_magic(self):
765         ss = self.create("test_bad_magic")
766         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
767         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
768         f = open(fn, "rb+")
769         f.seek(0)
770         f.write("BAD MAGIC")
771         f.close()
772         read = ss.remote_slot_readv
773         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
774                                   read, "si1", [0], [(0,10)])
775         self.failUnless(" had magic " in str(e), e)
776         self.failUnless(" but we wanted " in str(e), e)
777
778     def test_container_size(self):
779         ss = self.create("test_container_size")
780         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
781                       set([0,1,2]), 100)
782         read = ss.remote_slot_readv
783         rstaraw = ss.remote_slot_testv_and_readv_and_writev
784         secrets = ( self.write_enabler("we1"),
785                     self.renew_secret("we1"),
786                     self.cancel_secret("we1") )
787         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
788         answer = rstaraw("si1", secrets,
789                          {0: ([], [(0,data)], len(data)+12)},
790                          [])
791         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
792
793         # trying to make the container too large will raise an exception
794         TOOBIG = MutableShareFile.MAX_SIZE + 10
795         self.failUnlessRaises(DataTooLargeError,
796                               rstaraw, "si1", secrets,
797                               {0: ([], [(0,data)], TOOBIG)},
798                               [])
799
800         # it should be possible to make the container smaller, although at
801         # the moment this doesn't actually affect the share, unless the
802         # container size is dropped to zero, in which case the share is
803         # deleted.
804         answer = rstaraw("si1", secrets,
805                          {0: ([], [(0,data)], len(data)+8)},
806                          [])
807         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
808
809         answer = rstaraw("si1", secrets,
810                          {0: ([], [(0,data)], 0)},
811                          [])
812         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
813
814         read_answer = read("si1", [0], [(0,10)])
815         self.failUnlessEqual(read_answer, {})
816
817     def test_allocate(self):
818         ss = self.create("test_allocate")
819         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
820                       set([0,1,2]), 100)
821
822         read = ss.remote_slot_readv
823         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
824                              {0: [""]})
825         self.failUnlessEqual(read("si1", [], [(0, 10)]),
826                              {0: [""], 1: [""], 2: [""]})
827         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
828                              {0: [""]})
829
830         # try writing to one
831         secrets = ( self.write_enabler("we1"),
832                     self.renew_secret("we1"),
833                     self.cancel_secret("we1") )
834         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
835         write = ss.remote_slot_testv_and_readv_and_writev
836         answer = write("si1", secrets,
837                        {0: ([], [(0,data)], None)},
838                        [])
839         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
840
841         self.failUnlessEqual(read("si1", [0], [(0,20)]),
842                              {0: ["00000000001111111111"]})
843         self.failUnlessEqual(read("si1", [0], [(95,10)]),
844                              {0: ["99999"]})
845         #self.failUnlessEqual(s0.remote_get_length(), 100)
846
847         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
848         f = self.failUnlessRaises(BadWriteEnablerError,
849                                   write, "si1", bad_secrets,
850                                   {}, [])
851         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
852
853         # this testv should fail
854         answer = write("si1", secrets,
855                        {0: ([(0, 12, "eq", "444444444444"),
856                              (20, 5, "eq", "22222"),
857                              ],
858                             [(0, "x"*100)],
859                             None),
860                         },
861                        [(0,12), (20,5)],
862                        )
863         self.failUnlessEqual(answer, (False,
864                                       {0: ["000000000011", "22222"],
865                                        1: ["", ""],
866                                        2: ["", ""],
867                                        }))
868         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
869
870         # as should this one
871         answer = write("si1", secrets,
872                        {0: ([(10, 5, "lt", "11111"),
873                              ],
874                             [(0, "x"*100)],
875                             None),
876                         },
877                        [(10,5)],
878                        )
879         self.failUnlessEqual(answer, (False,
880                                       {0: ["11111"],
881                                        1: [""],
882                                        2: [""]},
883                                       ))
884         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
885
886
887     def test_operators(self):
888         # test operators, the data we're comparing is '11111' in all cases.
889         # test both fail+pass, reset data after each one.
890         ss = self.create("test_operators")
891
892         secrets = ( self.write_enabler("we1"),
893                     self.renew_secret("we1"),
894                     self.cancel_secret("we1") )
895         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
896         write = ss.remote_slot_testv_and_readv_and_writev
897         read = ss.remote_slot_readv
898
899         def reset():
900             write("si1", secrets,
901                   {0: ([], [(0,data)], None)},
902                   [])
903
904         reset()
905
906         #  lt
907         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
908                                              ],
909                                             [(0, "x"*100)],
910                                             None,
911                                             )}, [(10,5)])
912         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
913         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
914         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
915         reset()
916
917         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
918                                              ],
919                                             [(0, "x"*100)],
920                                             None,
921                                             )}, [(10,5)])
922         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
923         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
924         reset()
925
926         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
927                                              ],
928                                             [(0, "y"*100)],
929                                             None,
930                                             )}, [(10,5)])
931         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
932         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
933         reset()
934
935         #  le
936         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
937                                              ],
938                                             [(0, "x"*100)],
939                                             None,
940                                             )}, [(10,5)])
941         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
942         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
943         reset()
944
945         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
946                                              ],
947                                             [(0, "y"*100)],
948                                             None,
949                                             )}, [(10,5)])
950         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
951         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
952         reset()
953
954         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
955                                              ],
956                                             [(0, "y"*100)],
957                                             None,
958                                             )}, [(10,5)])
959         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
960         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
961         reset()
962
963         #  eq
964         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
965                                              ],
966                                             [(0, "x"*100)],
967                                             None,
968                                             )}, [(10,5)])
969         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
970         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
971         reset()
972
973         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
974                                              ],
975                                             [(0, "y"*100)],
976                                             None,
977                                             )}, [(10,5)])
978         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
979         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
980         reset()
981
982         #  ne
983         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
984                                              ],
985                                             [(0, "x"*100)],
986                                             None,
987                                             )}, [(10,5)])
988         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
989         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
990         reset()
991
992         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
993                                              ],
994                                             [(0, "y"*100)],
995                                             None,
996                                             )}, [(10,5)])
997         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
998         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
999         reset()
1000
1001         #  ge
1002         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1003                                              ],
1004                                             [(0, "y"*100)],
1005                                             None,
1006                                             )}, [(10,5)])
1007         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1008         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1009         reset()
1010
1011         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1012                                              ],
1013                                             [(0, "y"*100)],
1014                                             None,
1015                                             )}, [(10,5)])
1016         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1017         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1018         reset()
1019
1020         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1021                                              ],
1022                                             [(0, "y"*100)],
1023                                             None,
1024                                             )}, [(10,5)])
1025         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1026         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1027         reset()
1028
1029         #  gt
1030         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1031                                              ],
1032                                             [(0, "y"*100)],
1033                                             None,
1034                                             )}, [(10,5)])
1035         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1036         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1037         reset()
1038
1039         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1040                                              ],
1041                                             [(0, "x"*100)],
1042                                             None,
1043                                             )}, [(10,5)])
1044         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1045         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1046         reset()
1047
1048         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1049                                              ],
1050                                             [(0, "x"*100)],
1051                                             None,
1052                                             )}, [(10,5)])
1053         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1054         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1055         reset()
1056
1057         # finally, test some operators against empty shares
1058         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1059                                              ],
1060                                             [(0, "x"*100)],
1061                                             None,
1062                                             )}, [(10,5)])
1063         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1064         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1065         reset()
1066
1067     def test_readv(self):
1068         ss = self.create("test_readv")
1069         secrets = ( self.write_enabler("we1"),
1070                     self.renew_secret("we1"),
1071                     self.cancel_secret("we1") )
1072         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1073         write = ss.remote_slot_testv_and_readv_and_writev
1074         read = ss.remote_slot_readv
1075         data = [("%d" % i) * 100 for i in range(3)]
1076         rc = write("si1", secrets,
1077                    {0: ([], [(0,data[0])], None),
1078                     1: ([], [(0,data[1])], None),
1079                     2: ([], [(0,data[2])], None),
1080                     }, [])
1081         self.failUnlessEqual(rc, (True, {}))
1082
1083         answer = read("si1", [], [(0, 10)])
1084         self.failUnlessEqual(answer, {0: ["0"*10],
1085                                       1: ["1"*10],
1086                                       2: ["2"*10]})
1087
1088     def compare_leases_without_timestamps(self, leases_a, leases_b):
1089         self.failUnlessEqual(len(leases_a), len(leases_b))
1090         for i in range(len(leases_a)):
1091             a = leases_a[i]
1092             b = leases_b[i]
1093             self.failUnlessEqual(a.owner_num,       b.owner_num)
1094             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1095             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1096             self.failUnlessEqual(a.nodeid,          b.nodeid)
1097
1098     def compare_leases(self, leases_a, leases_b):
1099         self.failUnlessEqual(len(leases_a), len(leases_b))
1100         for i in range(len(leases_a)):
1101             a = leases_a[i]
1102             b = leases_b[i]
1103             self.failUnlessEqual(a.owner_num,       b.owner_num)
1104             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1105             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1106             self.failUnlessEqual(a.nodeid,          b.nodeid)
1107             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1108
1109     def test_leases(self):
1110         ss = self.create("test_leases")
1111         def secrets(n):
1112             return ( self.write_enabler("we1"),
1113                      self.renew_secret("we1-%d" % n),
1114                      self.cancel_secret("we1-%d" % n) )
1115         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1116         write = ss.remote_slot_testv_and_readv_and_writev
1117         read = ss.remote_slot_readv
1118         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1119         self.failUnlessEqual(rc, (True, {}))
1120
1121         # create a random non-numeric file in the bucket directory, to
1122         # exercise the code that's supposed to ignore those.
1123         bucket_dir = os.path.join(self.workdir("test_leases"),
1124                                   "shares", storage_index_to_dir("si1"))
1125         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1126         f.write("you ought to be ignoring me\n")
1127         f.close()
1128
1129         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1130         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1131
1132         # add-lease on a missing storage index is silently ignored
1133         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1134
1135         # re-allocate the slots and use the same secrets, that should update
1136         # the lease
1137         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1138         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1139
1140         # renew it directly
1141         ss.remote_renew_lease("si1", secrets(0)[1])
1142         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1143
1144         # now allocate them with a bunch of different secrets, to trigger the
1145         # extended lease code. Use add_lease for one of them.
1146         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1147         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1148         secrets2 = secrets(2)
1149         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1150         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1151         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1152         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1153         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1154
1155         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1156
1157         # cancel one of them
1158         ss.remote_cancel_lease("si1", secrets(5)[2])
1159         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1160
1161         all_leases = list(s0.get_leases())
1162         # and write enough data to expand the container, forcing the server
1163         # to move the leases
1164         write("si1", secrets(0),
1165               {0: ([], [(0,data)], 200), },
1166               [])
1167
1168         # read back the leases, make sure they're still intact.
1169         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1170
1171         ss.remote_renew_lease("si1", secrets(0)[1])
1172         ss.remote_renew_lease("si1", secrets(1)[1])
1173         ss.remote_renew_lease("si1", secrets(2)[1])
1174         ss.remote_renew_lease("si1", secrets(3)[1])
1175         ss.remote_renew_lease("si1", secrets(4)[1])
1176         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1177         # get a new copy of the leases, with the current timestamps. Reading
1178         # data and failing to renew/cancel leases should leave the timestamps
1179         # alone.
1180         all_leases = list(s0.get_leases())
1181         # renewing with a bogus token should prompt an error message
1182
1183         # examine the exception thus raised, make sure the old nodeid is
1184         # present, to provide for share migration
1185         e = self.failUnlessRaises(IndexError,
1186                                   ss.remote_renew_lease, "si1",
1187                                   secrets(20)[1])
1188         e_s = str(e)
1189         self.failUnless("Unable to renew non-existent lease" in e_s)
1190         self.failUnless("I have leases accepted by nodeids:" in e_s)
1191         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1192
1193         # same for cancelling
1194         self.failUnlessRaises(IndexError,
1195                               ss.remote_cancel_lease, "si1",
1196                               secrets(20)[2])
1197         self.compare_leases(all_leases, list(s0.get_leases()))
1198
1199         # reading shares should not modify the timestamp
1200         read("si1", [], [(0,200)])
1201         self.compare_leases(all_leases, list(s0.get_leases()))
1202
1203         write("si1", secrets(0),
1204               {0: ([], [(200, "make me bigger")], None)}, [])
1205         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1206
1207         write("si1", secrets(0),
1208               {0: ([], [(500, "make me really bigger")], None)}, [])
1209         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1210
1211         # now cancel them all
1212         ss.remote_cancel_lease("si1", secrets(0)[2])
1213         ss.remote_cancel_lease("si1", secrets(1)[2])
1214         ss.remote_cancel_lease("si1", secrets(2)[2])
1215         ss.remote_cancel_lease("si1", secrets(3)[2])
1216
1217         # the slot should still be there
1218         remaining_shares = read("si1", [], [(0,10)])
1219         self.failUnlessEqual(len(remaining_shares), 1)
1220         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1221
1222         # cancelling a non-existent lease should raise an IndexError
1223         self.failUnlessRaises(IndexError,
1224                               ss.remote_cancel_lease, "si1", "nonsecret")
1225
1226         # and the slot should still be there
1227         remaining_shares = read("si1", [], [(0,10)])
1228         self.failUnlessEqual(len(remaining_shares), 1)
1229         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1230
1231         ss.remote_cancel_lease("si1", secrets(4)[2])
1232         # now the slot should be gone
1233         no_shares = read("si1", [], [(0,10)])
1234         self.failUnlessEqual(no_shares, {})
1235
1236         # cancelling a lease on a non-existent share should raise an IndexError
1237         self.failUnlessRaises(IndexError,
1238                               ss.remote_cancel_lease, "si2", "nonsecret")
1239
1240     def test_remove(self):
1241         ss = self.create("test_remove")
1242         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1243                       set([0,1,2]), 100)
1244         readv = ss.remote_slot_readv
1245         writev = ss.remote_slot_testv_and_readv_and_writev
1246         secrets = ( self.write_enabler("we1"),
1247                     self.renew_secret("we1"),
1248                     self.cancel_secret("we1") )
1249         # delete sh0 by setting its size to zero
1250         answer = writev("si1", secrets,
1251                         {0: ([], [], 0)},
1252                         [])
1253         # the answer should mention all the shares that existed before the
1254         # write
1255         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1256         # but a new read should show only sh1 and sh2
1257         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1258                              {1: [""], 2: [""]})
1259
1260         # delete sh1 by setting its size to zero
1261         answer = writev("si1", secrets,
1262                         {1: ([], [], 0)},
1263                         [])
1264         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1265         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1266                              {2: [""]})
1267
1268         # delete sh2 by setting its size to zero
1269         answer = writev("si1", secrets,
1270                         {2: ([], [], 0)},
1271                         [])
1272         self.failUnlessEqual(answer, (True, {2:[]}) )
1273         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1274                              {})
1275         # and the bucket directory should now be gone
1276         si = base32.b2a("si1")
1277         # note: this is a detail of the storage server implementation, and
1278         # may change in the future
1279         prefix = si[:2]
1280         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1281         bucketdir = os.path.join(prefixdir, si)
1282         self.failUnless(os.path.exists(prefixdir))
1283         self.failIf(os.path.exists(bucketdir))
1284
1285 class Stats(unittest.TestCase):
1286
1287     def setUp(self):
1288         self.sparent = LoggingServiceParent()
1289         self._lease_secret = itertools.count()
1290     def tearDown(self):
1291         return self.sparent.stopService()
1292
1293     def workdir(self, name):
1294         basedir = os.path.join("storage", "Server", name)
1295         return basedir
1296
1297     def create(self, name):
1298         workdir = self.workdir(name)
1299         ss = StorageServer(workdir, "\x00" * 20)
1300         ss.setServiceParent(self.sparent)
1301         return ss
1302
1303     def test_latencies(self):
1304         ss = self.create("test_latencies")
1305         for i in range(10000):
1306             ss.add_latency("allocate", 1.0 * i)
1307         for i in range(1000):
1308             ss.add_latency("renew", 1.0 * i)
1309         for i in range(10):
1310             ss.add_latency("cancel", 2.0 * i)
1311         ss.add_latency("get", 5.0)
1312
1313         output = ss.get_latencies()
1314
1315         self.failUnlessEqual(sorted(output.keys()),
1316                              sorted(["allocate", "renew", "cancel", "get"]))
1317         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1318         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1319         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1320         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1321         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1322         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1323         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1324         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1325         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1326
1327         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1328         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1329         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1330         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1331         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1332         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1333         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1334         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1335         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1336
1337         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1338         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1339         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1340         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1341         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1342         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1343         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1344         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1345         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1346
1347         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1348         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1349         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1350         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1351         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1352         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1353         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1354         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1355         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1356
1357 def remove_tags(s):
1358     s = re.sub(r'<[^>]*>', ' ', s)
1359     s = re.sub(r'\s+', ' ', s)
1360     return s
1361
1362 class MyBucketCountingCrawler(BucketCountingCrawler):
1363     def finished_prefix(self, cycle, prefix):
1364         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1365         if self.hook_ds:
1366             d = self.hook_ds.pop(0)
1367             d.callback(None)
1368
1369 class MyStorageServer(StorageServer):
1370     def add_bucket_counter(self):
1371         statefile = os.path.join(self.storedir, "bucket_counter.state")
1372         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1373         self.bucket_counter.setServiceParent(self)
1374
1375 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1376
1377     def setUp(self):
1378         self.s = service.MultiService()
1379         self.s.startService()
1380     def tearDown(self):
1381         return self.s.stopService()
1382
1383     def test_bucket_counter(self):
1384         basedir = "storage/BucketCounter/bucket_counter"
1385         fileutil.make_dirs(basedir)
1386         ss = StorageServer(basedir, "\x00" * 20)
1387         # to make sure we capture the bucket-counting-crawler in the middle
1388         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1389         # also make it start sooner than usual.
1390         ss.bucket_counter.slow_start = 0
1391         orig_cpu_slice = ss.bucket_counter.cpu_slice
1392         ss.bucket_counter.cpu_slice = 0
1393         ss.setServiceParent(self.s)
1394
1395         w = StorageStatus(ss)
1396
1397         # this sample is before the crawler has started doing anything
1398         html = w.renderSynchronously()
1399         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1400         s = remove_tags(html)
1401         self.failUnless("Accepting new shares: Yes" in s, s)
1402         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1403         self.failUnless("Total buckets: Not computed yet" in s, s)
1404         self.failUnless("Next crawl in" in s, s)
1405
1406         # give the bucket-counting-crawler one tick to get started. The
1407         # cpu_slice=0 will force it to yield right after it processes the
1408         # first prefix
1409
1410         d = fireEventually()
1411         def _check(ignored):
1412             # are we really right after the first prefix?
1413             state = ss.bucket_counter.get_state()
1414             self.failUnlessEqual(state["last-complete-prefix"],
1415                                  ss.bucket_counter.prefixes[0])
1416             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1417             html = w.renderSynchronously()
1418             s = remove_tags(html)
1419             self.failUnless(" Current crawl " in s, s)
1420             self.failUnless(" (next work in " in s, s)
1421         d.addCallback(_check)
1422
1423         # now give it enough time to complete a full cycle
1424         def _watch():
1425             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1426         d.addCallback(lambda ignored: self.poll(_watch))
1427         def _check2(ignored):
1428             ss.bucket_counter.cpu_slice = orig_cpu_slice
1429             html = w.renderSynchronously()
1430             s = remove_tags(html)
1431             self.failUnless("Total buckets: 0 (the number of" in s, s)
1432             self.failUnless("Next crawl in 59 minutes" in s, s)
1433         d.addCallback(_check2)
1434         return d
1435
1436     def test_bucket_counter_cleanup(self):
1437         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1438         fileutil.make_dirs(basedir)
1439         ss = StorageServer(basedir, "\x00" * 20)
1440         # to make sure we capture the bucket-counting-crawler in the middle
1441         # of a cycle, we reach in and reduce its maximum slice time to 0.
1442         ss.bucket_counter.slow_start = 0
1443         orig_cpu_slice = ss.bucket_counter.cpu_slice
1444         ss.bucket_counter.cpu_slice = 0
1445         ss.setServiceParent(self.s)
1446
1447         d = fireEventually()
1448
1449         def _after_first_prefix(ignored):
1450             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1451             # now sneak in and mess with its state, to make sure it cleans up
1452             # properly at the end of the cycle
1453             state = ss.bucket_counter.state
1454             self.failUnlessEqual(state["last-complete-prefix"],
1455                                  ss.bucket_counter.prefixes[0])
1456             state["bucket-counts"][-12] = {}
1457             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1458             ss.bucket_counter.save_state()
1459         d.addCallback(_after_first_prefix)
1460
1461         # now give it enough time to complete a cycle
1462         def _watch():
1463             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1464         d.addCallback(lambda ignored: self.poll(_watch))
1465         def _check2(ignored):
1466             ss.bucket_counter.cpu_slice = orig_cpu_slice
1467             s = ss.bucket_counter.get_state()
1468             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1469             self.failIf("bogusprefix!" in s["storage-index-samples"],
1470                         s["storage-index-samples"].keys())
1471         d.addCallback(_check2)
1472         return d
1473
1474     def test_bucket_counter_eta(self):
1475         basedir = "storage/BucketCounter/bucket_counter_eta"
1476         fileutil.make_dirs(basedir)
1477         ss = MyStorageServer(basedir, "\x00" * 20)
1478         ss.bucket_counter.slow_start = 0
1479         # these will be fired inside finished_prefix()
1480         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1481         w = StorageStatus(ss)
1482
1483         d = defer.Deferred()
1484
1485         def _check_1(ignored):
1486             # no ETA is available yet
1487             html = w.renderSynchronously()
1488             s = remove_tags(html)
1489             self.failUnlessIn("complete (next work", s)
1490
1491         def _check_2(ignored):
1492             # one prefix has finished, so an ETA based upon that elapsed time
1493             # should be available.
1494             html = w.renderSynchronously()
1495             s = remove_tags(html)
1496             self.failUnlessIn("complete (ETA ", s)
1497
1498         def _check_3(ignored):
1499             # two prefixes have finished
1500             html = w.renderSynchronously()
1501             s = remove_tags(html)
1502             self.failUnlessIn("complete (ETA ", s)
1503             d.callback("done")
1504
1505         hooks[0].addCallback(_check_1).addErrback(d.errback)
1506         hooks[1].addCallback(_check_2).addErrback(d.errback)
1507         hooks[2].addCallback(_check_3).addErrback(d.errback)
1508
1509         ss.setServiceParent(self.s)
1510         return d
1511
1512 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1513     stop_after_first_bucket = False
1514     def process_bucket(self, *args, **kwargs):
1515         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1516         if self.stop_after_first_bucket:
1517             self.stop_after_first_bucket = False
1518             self.cpu_slice = -1.0
1519     def yielding(self, sleep_time):
1520         if not self.stop_after_first_bucket:
1521             self.cpu_slice = 500
1522
1523 class BrokenStatResults:
1524     pass
1525 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1526     def stat(self, fn):
1527         s = os.stat(fn)
1528         bsr = BrokenStatResults()
1529         for attrname in dir(s):
1530             if attrname.startswith("_"):
1531                 continue
1532             if attrname == "st_blocks":
1533                 continue
1534             setattr(bsr, attrname, getattr(s, attrname))
1535         return bsr
1536
1537 class InstrumentedStorageServer(StorageServer):
1538     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1539 class No_ST_BLOCKS_StorageServer(StorageServer):
1540     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1541
1542 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1543
1544     def setUp(self):
1545         self.s = service.MultiService()
1546         self.s.startService()
1547     def tearDown(self):
1548         return self.s.stopService()
1549
1550     def make_shares(self, ss):
1551         def make(si):
1552             return (si, hashutil.tagged_hash("renew", si),
1553                     hashutil.tagged_hash("cancel", si))
1554         def make_mutable(si):
1555             return (si, hashutil.tagged_hash("renew", si),
1556                     hashutil.tagged_hash("cancel", si),
1557                     hashutil.tagged_hash("write-enabler", si))
1558         def make_extra_lease(si, num):
1559             return (hashutil.tagged_hash("renew-%d" % num, si),
1560                     hashutil.tagged_hash("cancel-%d" % num, si))
1561
1562         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1563         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1564         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1565         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1566         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1567         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1568         sharenums = [0]
1569         canary = FakeCanary()
1570         # note: 'tahoe debug dump-share' will not handle this file, since the
1571         # inner contents are not a valid CHK share
1572         data = "\xff" * 1000
1573
1574         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1575                                          1000, canary)
1576         w[0].remote_write(0, data)
1577         w[0].remote_close()
1578
1579         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1580                                          1000, canary)
1581         w[0].remote_write(0, data)
1582         w[0].remote_close()
1583         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1584
1585         writev = ss.remote_slot_testv_and_readv_and_writev
1586         writev(mutable_si_2, (we2, rs2, cs2),
1587                {0: ([], [(0,data)], len(data))}, [])
1588         writev(mutable_si_3, (we3, rs3, cs3),
1589                {0: ([], [(0,data)], len(data))}, [])
1590         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1591
1592         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1593         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1594         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1595
1596     def test_basic(self):
1597         basedir = "storage/LeaseCrawler/basic"
1598         fileutil.make_dirs(basedir)
1599         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1600         # make it start sooner than usual.
1601         lc = ss.lease_checker
1602         lc.slow_start = 0
1603         lc.cpu_slice = 500
1604         lc.stop_after_first_bucket = True
1605         webstatus = StorageStatus(ss)
1606
1607         # create a few shares, with some leases on them
1608         self.make_shares(ss)
1609         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1610
1611         # add a non-sharefile to exercise another code path
1612         fn = os.path.join(ss.sharedir,
1613                           storage_index_to_dir(immutable_si_0),
1614                           "not-a-share")
1615         f = open(fn, "wb")
1616         f.write("I am not a share.\n")
1617         f.close()
1618
1619         # this is before the crawl has started, so we're not in a cycle yet
1620         initial_state = lc.get_state()
1621         self.failIf(lc.get_progress()["cycle-in-progress"])
1622         self.failIf("cycle-to-date" in initial_state)
1623         self.failIf("estimated-remaining-cycle" in initial_state)
1624         self.failIf("estimated-current-cycle" in initial_state)
1625         self.failUnless("history" in initial_state)
1626         self.failUnlessEqual(initial_state["history"], {})
1627
1628         ss.setServiceParent(self.s)
1629
1630         DAY = 24*60*60
1631
1632         d = fireEventually()
1633
1634         # now examine the state right after the first bucket has been
1635         # processed.
1636         def _after_first_bucket(ignored):
1637             initial_state = lc.get_state()
1638             self.failUnless("cycle-to-date" in initial_state)
1639             self.failUnless("estimated-remaining-cycle" in initial_state)
1640             self.failUnless("estimated-current-cycle" in initial_state)
1641             self.failUnless("history" in initial_state)
1642             self.failUnlessEqual(initial_state["history"], {})
1643
1644             so_far = initial_state["cycle-to-date"]
1645             self.failUnlessEqual(so_far["expiration-enabled"], False)
1646             self.failUnless("configured-expiration-mode" in so_far)
1647             self.failUnless("lease-age-histogram" in so_far)
1648             lah = so_far["lease-age-histogram"]
1649             self.failUnlessEqual(type(lah), list)
1650             self.failUnlessEqual(len(lah), 1)
1651             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1652             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1653             self.failUnlessEqual(so_far["corrupt-shares"], [])
1654             sr1 = so_far["space-recovered"]
1655             self.failUnlessEqual(sr1["examined-buckets"], 1)
1656             self.failUnlessEqual(sr1["examined-shares"], 1)
1657             self.failUnlessEqual(sr1["actual-shares"], 0)
1658             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1659             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1660             left = initial_state["estimated-remaining-cycle"]
1661             sr2 = left["space-recovered"]
1662             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1663             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1664             self.failIfEqual(sr2["actual-shares"], None)
1665             self.failIfEqual(sr2["configured-diskbytes"], None)
1666             self.failIfEqual(sr2["original-sharebytes"], None)
1667         d.addCallback(_after_first_bucket)
1668         d.addCallback(lambda ign: self.render1(webstatus))
1669         def _check_html_in_cycle(html):
1670             s = remove_tags(html)
1671             self.failUnlessIn("So far, this cycle has examined "
1672                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1673             self.failUnlessIn("and has recovered: "
1674                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1675                               "0 B (0 B / 0 B)", s)
1676             self.failUnlessIn("If expiration were enabled, "
1677                               "we would have recovered: "
1678                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1679                               " 0 B (0 B / 0 B) by now", s)
1680             self.failUnlessIn("and the remainder of this cycle "
1681                               "would probably recover: "
1682                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1683                               " 0 B (0 B / 0 B)", s)
1684             self.failUnlessIn("and the whole cycle would probably recover: "
1685                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1686                               " 0 B (0 B / 0 B)", s)
1687             self.failUnlessIn("if we were strictly using each lease's default "
1688                               "31-day lease lifetime", s)
1689             self.failUnlessIn("this cycle would be expected to recover: ", s)
1690         d.addCallback(_check_html_in_cycle)
1691
1692         # wait for the crawler to finish the first cycle. Nothing should have
1693         # been removed.
1694         def _wait():
1695             return bool(lc.get_state()["last-cycle-finished"] is not None)
1696         d.addCallback(lambda ign: self.poll(_wait))
1697
1698         def _after_first_cycle(ignored):
1699             s = lc.get_state()
1700             self.failIf("cycle-to-date" in s)
1701             self.failIf("estimated-remaining-cycle" in s)
1702             self.failIf("estimated-current-cycle" in s)
1703             last = s["history"][0]
1704             self.failUnless("cycle-start-finish-times" in last)
1705             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1706             self.failUnlessEqual(last["expiration-enabled"], False)
1707             self.failUnless("configured-expiration-mode" in last)
1708
1709             self.failUnless("lease-age-histogram" in last)
1710             lah = last["lease-age-histogram"]
1711             self.failUnlessEqual(type(lah), list)
1712             self.failUnlessEqual(len(lah), 1)
1713             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1714
1715             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1716             self.failUnlessEqual(last["corrupt-shares"], [])
1717
1718             rec = last["space-recovered"]
1719             self.failUnlessEqual(rec["examined-buckets"], 4)
1720             self.failUnlessEqual(rec["examined-shares"], 4)
1721             self.failUnlessEqual(rec["actual-buckets"], 0)
1722             self.failUnlessEqual(rec["original-buckets"], 0)
1723             self.failUnlessEqual(rec["configured-buckets"], 0)
1724             self.failUnlessEqual(rec["actual-shares"], 0)
1725             self.failUnlessEqual(rec["original-shares"], 0)
1726             self.failUnlessEqual(rec["configured-shares"], 0)
1727             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1728             self.failUnlessEqual(rec["original-diskbytes"], 0)
1729             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1730             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1731             self.failUnlessEqual(rec["original-sharebytes"], 0)
1732             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1733
1734             def _get_sharefile(si):
1735                 return list(ss._iter_share_files(si))[0]
1736             def count_leases(si):
1737                 return len(list(_get_sharefile(si).get_leases()))
1738             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1739             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1740             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1741             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1742         d.addCallback(_after_first_cycle)
1743         d.addCallback(lambda ign: self.render1(webstatus))
1744         def _check_html(html):
1745             s = remove_tags(html)
1746             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1747                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
1748                               "but expiration was not enabled", s)
1749         d.addCallback(_check_html)
1750         d.addCallback(lambda ign: self.render_json(webstatus))
1751         def _check_json(json):
1752             data = simplejson.loads(json)
1753             self.failUnless("lease-checker" in data)
1754             self.failUnless("lease-checker-progress" in data)
1755         d.addCallback(_check_json)
1756         return d
1757
1758     def backdate_lease(self, sf, renew_secret, new_expire_time):
1759         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1760         # "renew" a lease with a new_expire_time that is older than what the
1761         # current lease has), so we have to reach inside it.
1762         for i,lease in enumerate(sf.get_leases()):
1763             if lease.renew_secret == renew_secret:
1764                 lease.expiration_time = new_expire_time
1765                 f = open(sf.home, 'rb+')
1766                 sf._write_lease_record(f, i, lease)
1767                 f.close()
1768                 return
1769         raise IndexError("unable to renew non-existent lease")
1770
1771     def test_expire_age(self):
1772         basedir = "storage/LeaseCrawler/expire_age"
1773         fileutil.make_dirs(basedir)
1774         # setting expiration_time to 2000 means that any lease which is more
1775         # than 2000s old will be expired.
1776         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1777                                        expiration_enabled=True,
1778                                        expiration_mode="age",
1779                                        expiration_override_lease_duration=2000)
1780         # make it start sooner than usual.
1781         lc = ss.lease_checker
1782         lc.slow_start = 0
1783         lc.stop_after_first_bucket = True
1784         webstatus = StorageStatus(ss)
1785
1786         # create a few shares, with some leases on them
1787         self.make_shares(ss)
1788         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1789
1790         def count_shares(si):
1791             return len(list(ss._iter_share_files(si)))
1792         def _get_sharefile(si):
1793             return list(ss._iter_share_files(si))[0]
1794         def count_leases(si):
1795             return len(list(_get_sharefile(si).get_leases()))
1796
1797         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1798         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1799         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1800         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1801         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1802         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1803         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1804         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1805
1806         # artificially crank back the expiration time on the first lease of
1807         # each share, to make it look like it expired already (age=1000s).
1808         # Some shares have an extra lease which is set to expire at the
1809         # default time in 31 days from now (age=31days). We then run the
1810         # crawler, which will expire the first lease, making some shares get
1811         # deleted and others stay alive (with one remaining lease)
1812         now = time.time()
1813
1814         sf0 = _get_sharefile(immutable_si_0)
1815         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1816         sf0_size = os.stat(sf0.home).st_size
1817
1818         # immutable_si_1 gets an extra lease
1819         sf1 = _get_sharefile(immutable_si_1)
1820         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1821
1822         sf2 = _get_sharefile(mutable_si_2)
1823         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1824         sf2_size = os.stat(sf2.home).st_size
1825
1826         # mutable_si_3 gets an extra lease
1827         sf3 = _get_sharefile(mutable_si_3)
1828         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1829
1830         ss.setServiceParent(self.s)
1831
1832         d = fireEventually()
1833         # examine the state right after the first bucket has been processed
1834         def _after_first_bucket(ignored):
1835             p = lc.get_progress()
1836             self.failUnless(p["cycle-in-progress"])
1837         d.addCallback(_after_first_bucket)
1838         d.addCallback(lambda ign: self.render1(webstatus))
1839         def _check_html_in_cycle(html):
1840             s = remove_tags(html)
1841             # the first bucket encountered gets deleted, and its prefix
1842             # happens to be about 1/5th of the way through the ring, so the
1843             # predictor thinks we'll have 5 shares and that we'll delete them
1844             # all. This part of the test depends upon the SIs landing right
1845             # where they do now.
1846             self.failUnlessIn("The remainder of this cycle is expected to "
1847                               "recover: 4 shares, 4 buckets", s)
1848             self.failUnlessIn("The whole cycle is expected to examine "
1849                               "5 shares in 5 buckets and to recover: "
1850                               "5 shares, 5 buckets", s)
1851         d.addCallback(_check_html_in_cycle)
1852
1853         # wait for the crawler to finish the first cycle. Two shares should
1854         # have been removed
1855         def _wait():
1856             return bool(lc.get_state()["last-cycle-finished"] is not None)
1857         d.addCallback(lambda ign: self.poll(_wait))
1858
1859         def _after_first_cycle(ignored):
1860             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1861             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1862             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1863             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1864             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1865             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1866
1867             s = lc.get_state()
1868             last = s["history"][0]
1869
1870             self.failUnlessEqual(last["expiration-enabled"], True)
1871             self.failUnlessEqual(last["configured-expiration-mode"],
1872                                  ("age", 2000, None, ("mutable", "immutable")))
1873             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1874
1875             rec = last["space-recovered"]
1876             self.failUnlessEqual(rec["examined-buckets"], 4)
1877             self.failUnlessEqual(rec["examined-shares"], 4)
1878             self.failUnlessEqual(rec["actual-buckets"], 2)
1879             self.failUnlessEqual(rec["original-buckets"], 2)
1880             self.failUnlessEqual(rec["configured-buckets"], 2)
1881             self.failUnlessEqual(rec["actual-shares"], 2)
1882             self.failUnlessEqual(rec["original-shares"], 2)
1883             self.failUnlessEqual(rec["configured-shares"], 2)
1884             size = sf0_size + sf2_size
1885             self.failUnlessEqual(rec["actual-sharebytes"], size)
1886             self.failUnlessEqual(rec["original-sharebytes"], size)
1887             self.failUnlessEqual(rec["configured-sharebytes"], size)
1888             # different platforms have different notions of "blocks used by
1889             # this file", so merely assert that it's a number
1890             self.failUnless(rec["actual-diskbytes"] >= 0,
1891                             rec["actual-diskbytes"])
1892             self.failUnless(rec["original-diskbytes"] >= 0,
1893                             rec["original-diskbytes"])
1894             self.failUnless(rec["configured-diskbytes"] >= 0,
1895                             rec["configured-diskbytes"])
1896         d.addCallback(_after_first_cycle)
1897         d.addCallback(lambda ign: self.render1(webstatus))
1898         def _check_html(html):
1899             s = remove_tags(html)
1900             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1901             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1902             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1903         d.addCallback(_check_html)
1904         return d
1905
1906     def test_expire_cutoff_date(self):
1907         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1908         fileutil.make_dirs(basedir)
1909         # setting cutoff-date to 2000 seconds ago means that any lease which
1910         # is more than 2000s old will be expired.
1911         now = time.time()
1912         then = int(now - 2000)
1913         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1914                                        expiration_enabled=True,
1915                                        expiration_mode="cutoff-date",
1916                                        expiration_cutoff_date=then)
1917         # make it start sooner than usual.
1918         lc = ss.lease_checker
1919         lc.slow_start = 0
1920         lc.stop_after_first_bucket = True
1921         webstatus = StorageStatus(ss)
1922
1923         # create a few shares, with some leases on them
1924         self.make_shares(ss)
1925         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1926
1927         def count_shares(si):
1928             return len(list(ss._iter_share_files(si)))
1929         def _get_sharefile(si):
1930             return list(ss._iter_share_files(si))[0]
1931         def count_leases(si):
1932             return len(list(_get_sharefile(si).get_leases()))
1933
1934         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1935         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1936         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1937         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1938         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1939         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1940         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1941         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1942
1943         # artificially crank back the expiration time on the first lease of
1944         # each share, to make it look like was renewed 3000s ago. To achieve
1945         # this, we need to set the expiration time to now-3000+31days. This
1946         # will change when the lease format is improved to contain both
1947         # create/renew time and duration.
1948         new_expiration_time = now - 3000 + 31*24*60*60
1949
1950         # Some shares have an extra lease which is set to expire at the
1951         # default time in 31 days from now (age=31days). We then run the
1952         # crawler, which will expire the first lease, making some shares get
1953         # deleted and others stay alive (with one remaining lease)
1954
1955         sf0 = _get_sharefile(immutable_si_0)
1956         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1957         sf0_size = os.stat(sf0.home).st_size
1958
1959         # immutable_si_1 gets an extra lease
1960         sf1 = _get_sharefile(immutable_si_1)
1961         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1962
1963         sf2 = _get_sharefile(mutable_si_2)
1964         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1965         sf2_size = os.stat(sf2.home).st_size
1966
1967         # mutable_si_3 gets an extra lease
1968         sf3 = _get_sharefile(mutable_si_3)
1969         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1970
1971         ss.setServiceParent(self.s)
1972
1973         d = fireEventually()
1974         # examine the state right after the first bucket has been processed
1975         def _after_first_bucket(ignored):
1976             p = lc.get_progress()
1977             self.failUnless(p["cycle-in-progress"])
1978         d.addCallback(_after_first_bucket)
1979         d.addCallback(lambda ign: self.render1(webstatus))
1980         def _check_html_in_cycle(html):
1981             s = remove_tags(html)
1982             # the first bucket encountered gets deleted, and its prefix
1983             # happens to be about 1/5th of the way through the ring, so the
1984             # predictor thinks we'll have 5 shares and that we'll delete them
1985             # all. This part of the test depends upon the SIs landing right
1986             # where they do now.
1987             self.failUnlessIn("The remainder of this cycle is expected to "
1988                               "recover: 4 shares, 4 buckets", s)
1989             self.failUnlessIn("The whole cycle is expected to examine "
1990                               "5 shares in 5 buckets and to recover: "
1991                               "5 shares, 5 buckets", s)
1992         d.addCallback(_check_html_in_cycle)
1993
1994         # wait for the crawler to finish the first cycle. Two shares should
1995         # have been removed
1996         def _wait():
1997             return bool(lc.get_state()["last-cycle-finished"] is not None)
1998         d.addCallback(lambda ign: self.poll(_wait))
1999
2000         def _after_first_cycle(ignored):
2001             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2002             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2003             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2004             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2005             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2006             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2007
2008             s = lc.get_state()
2009             last = s["history"][0]
2010
2011             self.failUnlessEqual(last["expiration-enabled"], True)
2012             self.failUnlessEqual(last["configured-expiration-mode"],
2013                                  ("cutoff-date", None, then,
2014                                   ("mutable", "immutable")))
2015             self.failUnlessEqual(last["leases-per-share-histogram"],
2016                                  {1: 2, 2: 2})
2017
2018             rec = last["space-recovered"]
2019             self.failUnlessEqual(rec["examined-buckets"], 4)
2020             self.failUnlessEqual(rec["examined-shares"], 4)
2021             self.failUnlessEqual(rec["actual-buckets"], 2)
2022             self.failUnlessEqual(rec["original-buckets"], 0)
2023             self.failUnlessEqual(rec["configured-buckets"], 2)
2024             self.failUnlessEqual(rec["actual-shares"], 2)
2025             self.failUnlessEqual(rec["original-shares"], 0)
2026             self.failUnlessEqual(rec["configured-shares"], 2)
2027             size = sf0_size + sf2_size
2028             self.failUnlessEqual(rec["actual-sharebytes"], size)
2029             self.failUnlessEqual(rec["original-sharebytes"], 0)
2030             self.failUnlessEqual(rec["configured-sharebytes"], size)
2031             # different platforms have different notions of "blocks used by
2032             # this file", so merely assert that it's a number
2033             self.failUnless(rec["actual-diskbytes"] >= 0,
2034                             rec["actual-diskbytes"])
2035             self.failUnless(rec["original-diskbytes"] >= 0,
2036                             rec["original-diskbytes"])
2037             self.failUnless(rec["configured-diskbytes"] >= 0,
2038                             rec["configured-diskbytes"])
2039         d.addCallback(_after_first_cycle)
2040         d.addCallback(lambda ign: self.render1(webstatus))
2041         def _check_html(html):
2042             s = remove_tags(html)
2043             self.failUnlessIn("Expiration Enabled:"
2044                               " expired leases will be removed", s)
2045             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2046             substr = "Leases created or last renewed before %s will be considered expired." % date
2047             self.failUnlessIn(substr, s)
2048             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2049         d.addCallback(_check_html)
2050         return d
2051
2052     def test_only_immutable(self):
2053         basedir = "storage/LeaseCrawler/only_immutable"
2054         fileutil.make_dirs(basedir)
2055         now = time.time()
2056         then = int(now - 2000)
2057         ss = StorageServer(basedir, "\x00" * 20,
2058                            expiration_enabled=True,
2059                            expiration_mode="cutoff-date",
2060                            expiration_cutoff_date=then,
2061                            expiration_sharetypes=("immutable",))
2062         lc = ss.lease_checker
2063         lc.slow_start = 0
2064         webstatus = StorageStatus(ss)
2065
2066         self.make_shares(ss)
2067         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2068         # set all leases to be expirable
2069         new_expiration_time = now - 3000 + 31*24*60*60
2070
2071         def count_shares(si):
2072             return len(list(ss._iter_share_files(si)))
2073         def _get_sharefile(si):
2074             return list(ss._iter_share_files(si))[0]
2075         def count_leases(si):
2076             return len(list(_get_sharefile(si).get_leases()))
2077
2078         sf0 = _get_sharefile(immutable_si_0)
2079         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2080         sf1 = _get_sharefile(immutable_si_1)
2081         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2082         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2083         sf2 = _get_sharefile(mutable_si_2)
2084         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2085         sf3 = _get_sharefile(mutable_si_3)
2086         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2087         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2088
2089         ss.setServiceParent(self.s)
2090         def _wait():
2091             return bool(lc.get_state()["last-cycle-finished"] is not None)
2092         d = self.poll(_wait)
2093
2094         def _after_first_cycle(ignored):
2095             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2096             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2097             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2098             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2099             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2100             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2101         d.addCallback(_after_first_cycle)
2102         d.addCallback(lambda ign: self.render1(webstatus))
2103         def _check_html(html):
2104             s = remove_tags(html)
2105             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2106         d.addCallback(_check_html)
2107         return d
2108
2109     def test_only_mutable(self):
2110         basedir = "storage/LeaseCrawler/only_mutable"
2111         fileutil.make_dirs(basedir)
2112         now = time.time()
2113         then = int(now - 2000)
2114         ss = StorageServer(basedir, "\x00" * 20,
2115                            expiration_enabled=True,
2116                            expiration_mode="cutoff-date",
2117                            expiration_cutoff_date=then,
2118                            expiration_sharetypes=("mutable",))
2119         lc = ss.lease_checker
2120         lc.slow_start = 0
2121         webstatus = StorageStatus(ss)
2122
2123         self.make_shares(ss)
2124         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2125         # set all leases to be expirable
2126         new_expiration_time = now - 3000 + 31*24*60*60
2127
2128         def count_shares(si):
2129             return len(list(ss._iter_share_files(si)))
2130         def _get_sharefile(si):
2131             return list(ss._iter_share_files(si))[0]
2132         def count_leases(si):
2133             return len(list(_get_sharefile(si).get_leases()))
2134
2135         sf0 = _get_sharefile(immutable_si_0)
2136         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2137         sf1 = _get_sharefile(immutable_si_1)
2138         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2139         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2140         sf2 = _get_sharefile(mutable_si_2)
2141         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2142         sf3 = _get_sharefile(mutable_si_3)
2143         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2144         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2145
2146         ss.setServiceParent(self.s)
2147         def _wait():
2148             return bool(lc.get_state()["last-cycle-finished"] is not None)
2149         d = self.poll(_wait)
2150
2151         def _after_first_cycle(ignored):
2152             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2153             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2154             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2155             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2156             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2157             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2158         d.addCallback(_after_first_cycle)
2159         d.addCallback(lambda ign: self.render1(webstatus))
2160         def _check_html(html):
2161             s = remove_tags(html)
2162             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2163         d.addCallback(_check_html)
2164         return d
2165
2166     def test_bad_mode(self):
2167         basedir = "storage/LeaseCrawler/bad_mode"
2168         fileutil.make_dirs(basedir)
2169         e = self.failUnlessRaises(ValueError,
2170                                   StorageServer, basedir, "\x00" * 20,
2171                                   expiration_mode="bogus")
2172         self.failUnless("GC mode 'bogus' must be 'age' or 'cutoff-date'" in str(e), str(e))
2173
2174     def test_parse_duration(self):
2175         DAY = 24*60*60
2176         MONTH = 31*DAY
2177         YEAR = 365*DAY
2178         p = time_format.parse_duration
2179         self.failUnlessEqual(p("7days"), 7*DAY)
2180         self.failUnlessEqual(p("31day"), 31*DAY)
2181         self.failUnlessEqual(p("60 days"), 60*DAY)
2182         self.failUnlessEqual(p("2mo"), 2*MONTH)
2183         self.failUnlessEqual(p("3 month"), 3*MONTH)
2184         self.failUnlessEqual(p("2years"), 2*YEAR)
2185         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2186         self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2187                         in str(e), str(e))
2188
2189     def test_parse_date(self):
2190         p = time_format.parse_date
2191         self.failUnless(isinstance(p("2009-03-18"), int))
2192         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2193
2194     def test_limited_history(self):
2195         basedir = "storage/LeaseCrawler/limited_history"
2196         fileutil.make_dirs(basedir)
2197         ss = StorageServer(basedir, "\x00" * 20)
2198         # make it start sooner than usual.
2199         lc = ss.lease_checker
2200         lc.slow_start = 0
2201         lc.cpu_slice = 500
2202
2203         # create a few shares, with some leases on them
2204         self.make_shares(ss)
2205
2206         ss.setServiceParent(self.s)
2207
2208         def _wait_until_15_cycles_done():
2209             last = lc.state["last-cycle-finished"]
2210             if last is not None and last >= 15:
2211                 return True
2212             if lc.timer:
2213                 lc.timer.reset(0)
2214             return False
2215         d = self.poll(_wait_until_15_cycles_done)
2216
2217         def _check(ignored):
2218             s = lc.get_state()
2219             h = s["history"]
2220             self.failUnlessEqual(len(h), 10)
2221             self.failUnlessEqual(max(h.keys()), 15)
2222             self.failUnlessEqual(min(h.keys()), 6)
2223         d.addCallback(_check)
2224         return d
2225
2226     def test_unpredictable_future(self):
2227         basedir = "storage/LeaseCrawler/unpredictable_future"
2228         fileutil.make_dirs(basedir)
2229         ss = StorageServer(basedir, "\x00" * 20)
2230         # make it start sooner than usual.
2231         lc = ss.lease_checker
2232         lc.slow_start = 0
2233         lc.cpu_slice = -1.0 # stop quickly
2234
2235         self.make_shares(ss)
2236
2237         ss.setServiceParent(self.s)
2238
2239         d = fireEventually()
2240         def _check(ignored):
2241             # this should fire after the first bucket is complete, but before
2242             # the first prefix is complete, so the progress-measurer won't
2243             # think we've gotten far enough to raise our percent-complete
2244             # above 0%, triggering the cannot-predict-the-future code in
2245             # expirer.py . This will have to change if/when the
2246             # progress-measurer gets smart enough to count buckets (we'll
2247             # have to interrupt it even earlier, before it's finished the
2248             # first bucket).
2249             s = lc.get_state()
2250             self.failUnless("cycle-to-date" in s)
2251             self.failUnless("estimated-remaining-cycle" in s)
2252             self.failUnless("estimated-current-cycle" in s)
2253
2254             left = s["estimated-remaining-cycle"]["space-recovered"]
2255             self.failUnlessEqual(left["actual-buckets"], None)
2256             self.failUnlessEqual(left["original-buckets"], None)
2257             self.failUnlessEqual(left["configured-buckets"], None)
2258             self.failUnlessEqual(left["actual-shares"], None)
2259             self.failUnlessEqual(left["original-shares"], None)
2260             self.failUnlessEqual(left["configured-shares"], None)
2261             self.failUnlessEqual(left["actual-diskbytes"], None)
2262             self.failUnlessEqual(left["original-diskbytes"], None)
2263             self.failUnlessEqual(left["configured-diskbytes"], None)
2264             self.failUnlessEqual(left["actual-sharebytes"], None)
2265             self.failUnlessEqual(left["original-sharebytes"], None)
2266             self.failUnlessEqual(left["configured-sharebytes"], None)
2267
2268             full = s["estimated-remaining-cycle"]["space-recovered"]
2269             self.failUnlessEqual(full["actual-buckets"], None)
2270             self.failUnlessEqual(full["original-buckets"], None)
2271             self.failUnlessEqual(full["configured-buckets"], None)
2272             self.failUnlessEqual(full["actual-shares"], None)
2273             self.failUnlessEqual(full["original-shares"], None)
2274             self.failUnlessEqual(full["configured-shares"], None)
2275             self.failUnlessEqual(full["actual-diskbytes"], None)
2276             self.failUnlessEqual(full["original-diskbytes"], None)
2277             self.failUnlessEqual(full["configured-diskbytes"], None)
2278             self.failUnlessEqual(full["actual-sharebytes"], None)
2279             self.failUnlessEqual(full["original-sharebytes"], None)
2280             self.failUnlessEqual(full["configured-sharebytes"], None)
2281
2282         d.addCallback(_check)
2283         return d
2284
2285     def test_no_st_blocks(self):
2286         basedir = "storage/LeaseCrawler/no_st_blocks"
2287         fileutil.make_dirs(basedir)
2288         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2289                                         expiration_mode="age",
2290                                         expiration_override_lease_duration=-1000)
2291         # a negative expiration_time= means the "configured-"
2292         # space-recovered counts will be non-zero, since all shares will have
2293         # expired by then
2294
2295         # make it start sooner than usual.
2296         lc = ss.lease_checker
2297         lc.slow_start = 0
2298
2299         self.make_shares(ss)
2300         ss.setServiceParent(self.s)
2301         def _wait():
2302             return bool(lc.get_state()["last-cycle-finished"] is not None)
2303         d = self.poll(_wait)
2304
2305         def _check(ignored):
2306             s = lc.get_state()
2307             last = s["history"][0]
2308             rec = last["space-recovered"]
2309             self.failUnlessEqual(rec["configured-buckets"], 4)
2310             self.failUnlessEqual(rec["configured-shares"], 4)
2311             self.failUnless(rec["configured-sharebytes"] > 0,
2312                             rec["configured-sharebytes"])
2313             # without the .st_blocks field in os.stat() results, we should be
2314             # reporting diskbytes==sharebytes
2315             self.failUnlessEqual(rec["configured-sharebytes"],
2316                                  rec["configured-diskbytes"])
2317         d.addCallback(_check)
2318         return d
2319
2320     def test_share_corruption(self):
2321         self._poll_should_ignore_these_errors = [
2322             UnknownMutableContainerVersionError,
2323             UnknownImmutableContainerVersionError,
2324             ]
2325         basedir = "storage/LeaseCrawler/share_corruption"
2326         fileutil.make_dirs(basedir)
2327         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2328         w = StorageStatus(ss)
2329         # make it start sooner than usual.
2330         lc = ss.lease_checker
2331         lc.stop_after_first_bucket = True
2332         lc.slow_start = 0
2333         lc.cpu_slice = 500
2334
2335         # create a few shares, with some leases on them
2336         self.make_shares(ss)
2337
2338         # now corrupt one, and make sure the lease-checker keeps going
2339         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2340         first = min(self.sis)
2341         first_b32 = base32.b2a(first)
2342         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2343         f = open(fn, "rb+")
2344         f.seek(0)
2345         f.write("BAD MAGIC")
2346         f.close()
2347         # if get_share_file() doesn't see the correct mutable magic, it
2348         # assumes the file is an immutable share, and then
2349         # immutable.ShareFile sees a bad version. So regardless of which kind
2350         # of share we corrupted, this will trigger an
2351         # UnknownImmutableContainerVersionError.
2352
2353         # also create an empty bucket
2354         empty_si = base32.b2a("\x04"*16)
2355         empty_bucket_dir = os.path.join(ss.sharedir,
2356                                         storage_index_to_dir(empty_si))
2357         fileutil.make_dirs(empty_bucket_dir)
2358
2359         ss.setServiceParent(self.s)
2360
2361         d = fireEventually()
2362
2363         # now examine the state right after the first bucket has been
2364         # processed.
2365         def _after_first_bucket(ignored):
2366             so_far = lc.get_state()["cycle-to-date"]
2367             rec = so_far["space-recovered"]
2368             self.failUnlessEqual(rec["examined-buckets"], 1)
2369             self.failUnlessEqual(rec["examined-shares"], 0)
2370             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2371         d.addCallback(_after_first_bucket)
2372
2373         d.addCallback(lambda ign: self.render_json(w))
2374         def _check_json(json):
2375             data = simplejson.loads(json)
2376             # grr. json turns all dict keys into strings.
2377             so_far = data["lease-checker"]["cycle-to-date"]
2378             corrupt_shares = so_far["corrupt-shares"]
2379             # it also turns all tuples into lists
2380             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2381         d.addCallback(_check_json)
2382         d.addCallback(lambda ign: self.render1(w))
2383         def _check_html(html):
2384             s = remove_tags(html)
2385             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2386         d.addCallback(_check_html)
2387
2388         def _wait():
2389             return bool(lc.get_state()["last-cycle-finished"] is not None)
2390         d.addCallback(lambda ign: self.poll(_wait))
2391
2392         def _after_first_cycle(ignored):
2393             s = lc.get_state()
2394             last = s["history"][0]
2395             rec = last["space-recovered"]
2396             self.failUnlessEqual(rec["examined-buckets"], 5)
2397             self.failUnlessEqual(rec["examined-shares"], 3)
2398             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2399         d.addCallback(_after_first_cycle)
2400         d.addCallback(lambda ign: self.render_json(w))
2401         def _check_json_history(json):
2402             data = simplejson.loads(json)
2403             last = data["lease-checker"]["history"]["0"]
2404             corrupt_shares = last["corrupt-shares"]
2405             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2406         d.addCallback(_check_json_history)
2407         d.addCallback(lambda ign: self.render1(w))
2408         def _check_html_history(html):
2409             s = remove_tags(html)
2410             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2411         d.addCallback(_check_html_history)
2412
2413         def _cleanup(res):
2414             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2415                                    UnknownImmutableContainerVersionError)
2416             return res
2417         d.addBoth(_cleanup)
2418         return d
2419
2420     def render_json(self, page):
2421         d = self.render1(page, args={"t": ["json"]})
2422         return d
2423
2424 class NoDiskStatsServer(StorageServer):
2425     def get_disk_stats(self):
2426         raise AttributeError
2427
2428 class BadDiskStatsServer(StorageServer):
2429     def get_disk_stats(self):
2430         raise OSError
2431
2432 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2433
2434     def setUp(self):
2435         self.s = service.MultiService()
2436         self.s.startService()
2437     def tearDown(self):
2438         return self.s.stopService()
2439
2440     def test_no_server(self):
2441         w = StorageStatus(None)
2442         html = w.renderSynchronously()
2443         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2444
2445     def test_status(self):
2446         basedir = "storage/WebStatus/status"
2447         fileutil.make_dirs(basedir)
2448         ss = StorageServer(basedir, "\x00" * 20)
2449         ss.setServiceParent(self.s)
2450         w = StorageStatus(ss)
2451         d = self.render1(w)
2452         def _check_html(html):
2453             self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2454             s = remove_tags(html)
2455             self.failUnless("Accepting new shares: Yes" in s, s)
2456             self.failUnless("Reserved space: - 0 B (0)" in s, s)
2457         d.addCallback(_check_html)
2458         d.addCallback(lambda ign: self.render_json(w))
2459         def _check_json(json):
2460             data = simplejson.loads(json)
2461             s = data["stats"]
2462             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2463             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2464             self.failUnless("bucket-counter" in data)
2465             self.failUnless("lease-checker" in data)
2466         d.addCallback(_check_json)
2467         return d
2468
2469     def render_json(self, page):
2470         d = self.render1(page, args={"t": ["json"]})
2471         return d
2472
2473     def test_status_no_disk_stats(self):
2474         # Some platforms may have no disk stats API. Make sure the code can handle that
2475         # (test runs on all platforms).
2476         basedir = "storage/WebStatus/status_no_disk_stats"
2477         fileutil.make_dirs(basedir)
2478         ss = NoDiskStatsServer(basedir, "\x00" * 20)
2479         ss.setServiceParent(self.s)
2480         w = StorageStatus(ss)
2481         html = w.renderSynchronously()
2482         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2483         s = remove_tags(html)
2484         self.failUnless("Accepting new shares: Yes" in s, s)
2485         self.failUnless("Total disk space: ?" in s, s)
2486         self.failUnless("Space Available to Tahoe: ?" in s, s)
2487         self.failUnless(ss.get_available_space() is None)
2488
2489     def test_status_bad_disk_stats(self):
2490         # If the API to get disk stats exists but a call to it fails, then the status should
2491         # show that no shares will be accepted, and get_available_space() should be 0.
2492         basedir = "storage/WebStatus/status_bad_disk_stats"
2493         fileutil.make_dirs(basedir)
2494         ss = BadDiskStatsServer(basedir, "\x00" * 20)
2495         ss.setServiceParent(self.s)
2496         w = StorageStatus(ss)
2497         html = w.renderSynchronously()
2498         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2499         s = remove_tags(html)
2500         self.failUnless("Accepting new shares: No" in s, s)
2501         self.failUnless("Total disk space: ?" in s, s)
2502         self.failUnless("Space Available to Tahoe: ?" in s, s)
2503         self.failUnless(ss.get_available_space() == 0)
2504
2505     def test_readonly(self):
2506         basedir = "storage/WebStatus/readonly"
2507         fileutil.make_dirs(basedir)
2508         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2509         ss.setServiceParent(self.s)
2510         w = StorageStatus(ss)
2511         html = w.renderSynchronously()
2512         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2513         s = remove_tags(html)
2514         self.failUnless("Accepting new shares: No" in s, s)
2515
2516     def test_reserved(self):
2517         basedir = "storage/WebStatus/reserved"
2518         fileutil.make_dirs(basedir)
2519         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2520         ss.setServiceParent(self.s)
2521         w = StorageStatus(ss)
2522         html = w.renderSynchronously()
2523         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2524         s = remove_tags(html)
2525         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2526
2527     def test_huge_reserved(self):
2528         basedir = "storage/WebStatus/reserved"
2529         fileutil.make_dirs(basedir)
2530         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2531         ss.setServiceParent(self.s)
2532         w = StorageStatus(ss)
2533         html = w.renderSynchronously()
2534         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2535         s = remove_tags(html)
2536         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2537
2538     def test_util(self):
2539         w = StorageStatus(None)
2540         self.failUnlessEqual(w.render_space(None, None), "?")
2541         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2542         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2543         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2544         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2545         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
2546