]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
91c2fdc179ff213878be99997076b567fe82cd80
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.interfaces import BadWriteEnablerError
24 from allmydata.test.common import LoggingServiceParent
25 from allmydata.test.common_web import WebRenderingMixin
26 from allmydata.test.no_network import NoNetworkServer
27 from allmydata.web.storage import StorageStatus, remove_prefix
28
29 class Marker:
30     pass
31 class FakeCanary:
32     def __init__(self, ignore_disconnectors=False):
33         self.ignore = ignore_disconnectors
34         self.disconnectors = {}
35     def notifyOnDisconnect(self, f, *args, **kwargs):
36         if self.ignore:
37             return
38         m = Marker()
39         self.disconnectors[m] = (f, args, kwargs)
40         return m
41     def dontNotifyOnDisconnect(self, marker):
42         if self.ignore:
43             return
44         del self.disconnectors[marker]
45
46 class FakeStatsProvider:
47     def count(self, name, delta=1):
48         pass
49     def register_producer(self, producer):
50         pass
51
52 class Bucket(unittest.TestCase):
53     def make_workdir(self, name):
54         basedir = os.path.join("storage", "Bucket", name)
55         incoming = os.path.join(basedir, "tmp", "bucket")
56         final = os.path.join(basedir, "bucket")
57         fileutil.make_dirs(basedir)
58         fileutil.make_dirs(os.path.join(basedir, "tmp"))
59         return incoming, final
60
61     def bucket_writer_closed(self, bw, consumed):
62         pass
63     def add_latency(self, category, latency):
64         pass
65     def count(self, name, delta=1):
66         pass
67
68     def make_lease(self):
69         owner_num = 0
70         renew_secret = os.urandom(32)
71         cancel_secret = os.urandom(32)
72         expiration_time = time.time() + 5000
73         return LeaseInfo(owner_num, renew_secret, cancel_secret,
74                          expiration_time, "\x00" * 20)
75
76     def test_create(self):
77         incoming, final = self.make_workdir("test_create")
78         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
79                           FakeCanary())
80         bw.remote_write(0, "a"*25)
81         bw.remote_write(25, "b"*25)
82         bw.remote_write(50, "c"*25)
83         bw.remote_write(75, "d"*7)
84         bw.remote_close()
85
86     def test_readwrite(self):
87         incoming, final = self.make_workdir("test_readwrite")
88         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
89                           FakeCanary())
90         bw.remote_write(0, "a"*25)
91         bw.remote_write(25, "b"*25)
92         bw.remote_write(50, "c"*7) # last block may be short
93         bw.remote_close()
94
95         # now read from it
96         br = BucketReader(self, bw.finalhome)
97         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
98         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
99         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
100
101 class RemoteBucket:
102
103     def callRemote(self, methname, *args, **kwargs):
104         def _call():
105             meth = getattr(self.target, "remote_" + methname)
106             return meth(*args, **kwargs)
107         return defer.maybeDeferred(_call)
108
109 class BucketProxy(unittest.TestCase):
110     def make_bucket(self, name, size):
111         basedir = os.path.join("storage", "BucketProxy", name)
112         incoming = os.path.join(basedir, "tmp", "bucket")
113         final = os.path.join(basedir, "bucket")
114         fileutil.make_dirs(basedir)
115         fileutil.make_dirs(os.path.join(basedir, "tmp"))
116         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
117                           FakeCanary())
118         rb = RemoteBucket()
119         rb.target = bw
120         return bw, rb, final
121
122     def make_lease(self):
123         owner_num = 0
124         renew_secret = os.urandom(32)
125         cancel_secret = os.urandom(32)
126         expiration_time = time.time() + 5000
127         return LeaseInfo(owner_num, renew_secret, cancel_secret,
128                          expiration_time, "\x00" * 20)
129
130     def bucket_writer_closed(self, bw, consumed):
131         pass
132     def add_latency(self, category, latency):
133         pass
134     def count(self, name, delta=1):
135         pass
136
137     def test_create(self):
138         bw, rb, sharefname = self.make_bucket("test_create", 500)
139         bp = WriteBucketProxy(rb,
140                               data_size=300,
141                               block_size=10,
142                               num_segments=5,
143                               num_share_hashes=3,
144                               uri_extension_size_max=500, nodeid=None)
145         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
146
147     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
148         # Let's pretend each share has 100 bytes of data, and that there are
149         # 4 segments (25 bytes each), and 8 shares total. So the two
150         # per-segment merkle trees (crypttext_hash_tree,
151         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
152         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
153         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
154         # long. That should make the whole share:
155         #
156         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
157         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
158
159         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
160
161         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
162                             for i in range(7)]
163         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
164                         for i in range(7)]
165         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
166                         for i in (1,9,13)]
167         uri_extension = "s" + "E"*498 + "e"
168
169         bw, rb, sharefname = self.make_bucket(name, sharesize)
170         bp = wbp_class(rb,
171                        data_size=95,
172                        block_size=25,
173                        num_segments=4,
174                        num_share_hashes=3,
175                        uri_extension_size_max=len(uri_extension),
176                        nodeid=None)
177
178         d = bp.put_header()
179         d.addCallback(lambda res: bp.put_block(0, "a"*25))
180         d.addCallback(lambda res: bp.put_block(1, "b"*25))
181         d.addCallback(lambda res: bp.put_block(2, "c"*25))
182         d.addCallback(lambda res: bp.put_block(3, "d"*20))
183         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
184         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
185         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
186         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
187         d.addCallback(lambda res: bp.close())
188
189         # now read everything back
190         def _start_reading(res):
191             br = BucketReader(self, sharefname)
192             rb = RemoteBucket()
193             rb.target = br
194             server = NoNetworkServer("abc", None)
195             rbp = rbp_class(rb, server, storage_index="")
196             self.failUnlessIn("to peer", repr(rbp))
197             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
198
199             d1 = rbp.get_block_data(0, 25, 25)
200             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
201             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
202             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
203             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
204             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
205             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
206             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
207
208             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
209             d1.addCallback(lambda res:
210                            self.failUnlessEqual(res, crypttext_hashes))
211             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
212             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
213             d1.addCallback(lambda res: rbp.get_share_hashes())
214             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
215             d1.addCallback(lambda res: rbp.get_uri_extension())
216             d1.addCallback(lambda res:
217                            self.failUnlessEqual(res, uri_extension))
218
219             return d1
220
221         d.addCallback(_start_reading)
222
223         return d
224
225     def test_readwrite_v1(self):
226         return self._do_test_readwrite("test_readwrite_v1",
227                                        0x24, WriteBucketProxy, ReadBucketProxy)
228
229     def test_readwrite_v2(self):
230         return self._do_test_readwrite("test_readwrite_v2",
231                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
232
233 class Server(unittest.TestCase):
234
235     def setUp(self):
236         self.sparent = LoggingServiceParent()
237         self.sparent.startService()
238         self._lease_secret = itertools.count()
239     def tearDown(self):
240         return self.sparent.stopService()
241
242     def workdir(self, name):
243         basedir = os.path.join("storage", "Server", name)
244         return basedir
245
246     def create(self, name, reserved_space=0, klass=StorageServer):
247         workdir = self.workdir(name)
248         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
249                    stats_provider=FakeStatsProvider())
250         ss.setServiceParent(self.sparent)
251         return ss
252
253     def test_create(self):
254         self.create("test_create")
255
256     def allocate(self, ss, storage_index, sharenums, size, canary=None):
257         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
258         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259         if not canary:
260             canary = FakeCanary()
261         return ss.remote_allocate_buckets(storage_index,
262                                           renew_secret, cancel_secret,
263                                           sharenums, size, canary)
264
265     def test_large_share(self):
266         syslow = platform.system().lower()
267         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
268             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
269
270         avail = fileutil.get_available_space('.', 512*2**20)
271         if avail <= 4*2**30:
272             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
273
274         ss = self.create("test_large_share")
275
276         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
277         self.failUnlessEqual(already, set())
278         self.failUnlessEqual(set(writers.keys()), set([0]))
279
280         shnum, bucket = writers.items()[0]
281         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
282         bucket.remote_write(2**32, "ab")
283         bucket.remote_close()
284
285         readers = ss.remote_get_buckets("allocate")
286         reader = readers[shnum]
287         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
288
289     def test_dont_overfill_dirs(self):
290         """
291         This test asserts that if you add a second share whose storage index
292         share lots of leading bits with an extant share (but isn't the exact
293         same storage index), this won't add an entry to the share directory.
294         """
295         ss = self.create("test_dont_overfill_dirs")
296         already, writers = self.allocate(ss, "storageindex", [0], 10)
297         for i, wb in writers.items():
298             wb.remote_write(0, "%10d" % i)
299             wb.remote_close()
300         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
301                                 "shares")
302         children_of_storedir = set(os.listdir(storedir))
303
304         # Now store another one under another storageindex that has leading
305         # chars the same as the first storageindex.
306         already, writers = self.allocate(ss, "storageindey", [0], 10)
307         for i, wb in writers.items():
308             wb.remote_write(0, "%10d" % i)
309             wb.remote_close()
310         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
311                                 "shares")
312         new_children_of_storedir = set(os.listdir(storedir))
313         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
314
315     def test_remove_incoming(self):
316         ss = self.create("test_remove_incoming")
317         already, writers = self.allocate(ss, "vid", range(3), 10)
318         for i,wb in writers.items():
319             wb.remote_write(0, "%10d" % i)
320             wb.remote_close()
321         incoming_share_dir = wb.incominghome
322         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
323         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
324         incoming_dir = os.path.dirname(incoming_prefix_dir)
325         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
326         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
327         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
328
329     def test_abort(self):
330         # remote_abort, when called on a writer, should make sure that
331         # the allocated size of the bucket is not counted by the storage
332         # server when accounting for space.
333         ss = self.create("test_abort")
334         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
335         self.failIfEqual(ss.allocated_size(), 0)
336
337         # Now abort the writers.
338         for writer in writers.itervalues():
339             writer.remote_abort()
340         self.failUnlessEqual(ss.allocated_size(), 0)
341
342
343     def test_allocate(self):
344         ss = self.create("test_allocate")
345
346         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
347
348         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
349         self.failUnlessEqual(already, set())
350         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
351
352         # while the buckets are open, they should not count as readable
353         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
354
355         # close the buckets
356         for i,wb in writers.items():
357             wb.remote_write(0, "%25d" % i)
358             wb.remote_close()
359             # aborting a bucket that was already closed is a no-op
360             wb.remote_abort()
361
362         # now they should be readable
363         b = ss.remote_get_buckets("allocate")
364         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
365         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
366         b_str = str(b[0])
367         self.failUnlessIn("BucketReader", b_str)
368         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
369
370         # now if we ask about writing again, the server should offer those
371         # three buckets as already present. It should offer them even if we
372         # don't ask about those specific ones.
373         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
374         self.failUnlessEqual(already, set([0,1,2]))
375         self.failUnlessEqual(set(writers.keys()), set([3,4]))
376
377         # while those two buckets are open for writing, the server should
378         # refuse to offer them to uploaders
379
380         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
381         self.failUnlessEqual(already2, set([0,1,2]))
382         self.failUnlessEqual(set(writers2.keys()), set([5]))
383
384         # aborting the writes should remove the tempfiles
385         for i,wb in writers2.items():
386             wb.remote_abort()
387         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
388         self.failUnlessEqual(already2, set([0,1,2]))
389         self.failUnlessEqual(set(writers2.keys()), set([5]))
390
391         for i,wb in writers2.items():
392             wb.remote_abort()
393         for i,wb in writers.items():
394             wb.remote_abort()
395
396     def test_bad_container_version(self):
397         ss = self.create("test_bad_container_version")
398         a,w = self.allocate(ss, "si1", [0], 10)
399         w[0].remote_write(0, "\xff"*10)
400         w[0].remote_close()
401
402         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
403         f = open(fn, "rb+")
404         f.seek(0)
405         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
406         f.close()
407
408         ss.remote_get_buckets("allocate")
409
410         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
411                                   ss.remote_get_buckets, "si1")
412         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
413
414     def test_disconnect(self):
415         # simulate a disconnection
416         ss = self.create("test_disconnect")
417         canary = FakeCanary()
418         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
419         self.failUnlessEqual(already, set())
420         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
421         for (f,args,kwargs) in canary.disconnectors.values():
422             f(*args, **kwargs)
423         del already
424         del writers
425
426         # that ought to delete the incoming shares
427         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
428         self.failUnlessEqual(already, set())
429         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
430
431     @mock.patch('allmydata.util.fileutil.get_disk_stats')
432     def test_reserved_space(self, mock_get_disk_stats):
433         reserved_space=10000
434         mock_get_disk_stats.return_value = {
435             'free_for_nonroot': 15000,
436             'avail': max(15000 - reserved_space, 0),
437             }
438
439         ss = self.create("test_reserved_space", reserved_space=reserved_space)
440         # 15k available, 10k reserved, leaves 5k for shares
441
442         # a newly created and filled share incurs this much overhead, beyond
443         # the size we request.
444         OVERHEAD = 3*4
445         LEASE_SIZE = 4+32+32+4
446         canary = FakeCanary(True)
447         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
448         self.failUnlessEqual(len(writers), 3)
449         # now the StorageServer should have 3000 bytes provisionally
450         # allocated, allowing only 2000 more to be claimed
451         self.failUnlessEqual(len(ss._active_writers), 3)
452
453         # allocating 1001-byte shares only leaves room for one
454         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
455         self.failUnlessEqual(len(writers2), 1)
456         self.failUnlessEqual(len(ss._active_writers), 4)
457
458         # we abandon the first set, so their provisional allocation should be
459         # returned
460         del already
461         del writers
462         self.failUnlessEqual(len(ss._active_writers), 1)
463         # now we have a provisional allocation of 1001 bytes
464
465         # and we close the second set, so their provisional allocation should
466         # become real, long-term allocation, and grows to include the
467         # overhead.
468         for bw in writers2.values():
469             bw.remote_write(0, "a"*25)
470             bw.remote_close()
471         del already2
472         del writers2
473         del bw
474         self.failUnlessEqual(len(ss._active_writers), 0)
475
476         allocated = 1001 + OVERHEAD + LEASE_SIZE
477
478         # we have to manually increase available, since we're not doing real
479         # disk measurements
480         mock_get_disk_stats.return_value = {
481             'free_for_nonroot': 15000 - allocated,
482             'avail': max(15000 - allocated - reserved_space, 0),
483             }
484
485         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
486         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
487         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
488         self.failUnlessEqual(len(writers3), 39)
489         self.failUnlessEqual(len(ss._active_writers), 39)
490
491         del already3
492         del writers3
493         self.failUnlessEqual(len(ss._active_writers), 0)
494         ss.disownServiceParent()
495         del ss
496
497     def test_seek(self):
498         basedir = self.workdir("test_seek_behavior")
499         fileutil.make_dirs(basedir)
500         filename = os.path.join(basedir, "testfile")
501         f = open(filename, "wb")
502         f.write("start")
503         f.close()
504         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
505         # files. mode="a" preserves previous contents but does not allow
506         # seeking-to-create-holes. mode="r+" allows both.
507         f = open(filename, "rb+")
508         f.seek(100)
509         f.write("100")
510         f.close()
511         filelen = os.stat(filename)[stat.ST_SIZE]
512         self.failUnlessEqual(filelen, 100+3)
513         f2 = open(filename, "rb")
514         self.failUnlessEqual(f2.read(5), "start")
515
516
517     def test_leases(self):
518         ss = self.create("test_leases")
519         canary = FakeCanary()
520         sharenums = range(5)
521         size = 100
522
523         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
524                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
525         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
526                                                      sharenums, size, canary)
527         self.failUnlessEqual(len(already), 0)
528         self.failUnlessEqual(len(writers), 5)
529         for wb in writers.values():
530             wb.remote_close()
531
532         leases = list(ss.get_leases("si0"))
533         self.failUnlessEqual(len(leases), 1)
534         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
535
536         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
537                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
538         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
539                                                      sharenums, size, canary)
540         for wb in writers.values():
541             wb.remote_close()
542
543         # take out a second lease on si1
544         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
545                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
546         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
547                                                      sharenums, size, canary)
548         self.failUnlessEqual(len(already), 5)
549         self.failUnlessEqual(len(writers), 0)
550
551         leases = list(ss.get_leases("si1"))
552         self.failUnlessEqual(len(leases), 2)
553         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
554
555         # and a third lease, using add-lease
556         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
557                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
558         ss.remote_add_lease("si1", rs2a, cs2a)
559         leases = list(ss.get_leases("si1"))
560         self.failUnlessEqual(len(leases), 3)
561         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
562
563         # add-lease on a missing storage index is silently ignored
564         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
565
566         # check that si0 is readable
567         readers = ss.remote_get_buckets("si0")
568         self.failUnlessEqual(len(readers), 5)
569
570         # renew the first lease. Only the proper renew_secret should work
571         ss.remote_renew_lease("si0", rs0)
572         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
573         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
574
575         # check that si0 is still readable
576         readers = ss.remote_get_buckets("si0")
577         self.failUnlessEqual(len(readers), 5)
578
579         # now cancel it
580         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
581         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
582         ss.remote_cancel_lease("si0", cs0)
583
584         # si0 should now be gone
585         readers = ss.remote_get_buckets("si0")
586         self.failUnlessEqual(len(readers), 0)
587         # and the renew should no longer work
588         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
589
590
591         # cancel the first lease on si1, leaving the second and third in place
592         ss.remote_cancel_lease("si1", cs1)
593         readers = ss.remote_get_buckets("si1")
594         self.failUnlessEqual(len(readers), 5)
595         # the corresponding renew should no longer work
596         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
597
598         leases = list(ss.get_leases("si1"))
599         self.failUnlessEqual(len(leases), 2)
600         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
601
602         ss.remote_renew_lease("si1", rs2)
603         # cancelling the second and third should make it go away
604         ss.remote_cancel_lease("si1", cs2)
605         ss.remote_cancel_lease("si1", cs2a)
606         readers = ss.remote_get_buckets("si1")
607         self.failUnlessEqual(len(readers), 0)
608         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
609         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
610         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
611
612         leases = list(ss.get_leases("si1"))
613         self.failUnlessEqual(len(leases), 0)
614
615
616         # test overlapping uploads
617         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
618                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
619         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
620                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
621         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
622                                                      sharenums, size, canary)
623         self.failUnlessEqual(len(already), 0)
624         self.failUnlessEqual(len(writers), 5)
625         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
626                                                        sharenums, size, canary)
627         self.failUnlessEqual(len(already2), 0)
628         self.failUnlessEqual(len(writers2), 0)
629         for wb in writers.values():
630             wb.remote_close()
631
632         leases = list(ss.get_leases("si3"))
633         self.failUnlessEqual(len(leases), 1)
634
635         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
636                                                        sharenums, size, canary)
637         self.failUnlessEqual(len(already3), 5)
638         self.failUnlessEqual(len(writers3), 0)
639
640         leases = list(ss.get_leases("si3"))
641         self.failUnlessEqual(len(leases), 2)
642
643     def test_readonly(self):
644         workdir = self.workdir("test_readonly")
645         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
646         ss.setServiceParent(self.sparent)
647
648         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
649         self.failUnlessEqual(already, set())
650         self.failUnlessEqual(writers, {})
651
652         stats = ss.get_stats()
653         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
654         if "storage_server.disk_avail" in stats:
655             # Some platforms may not have an API to get disk stats.
656             # But if there are stats, readonly_storage means disk_avail=0
657             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
658
659     def test_discard(self):
660         # discard is really only used for other tests, but we test it anyways
661         workdir = self.workdir("test_discard")
662         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
663         ss.setServiceParent(self.sparent)
664
665         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
666         self.failUnlessEqual(already, set())
667         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
668         for i,wb in writers.items():
669             wb.remote_write(0, "%25d" % i)
670             wb.remote_close()
671         # since we discard the data, the shares should be present but sparse.
672         # Since we write with some seeks, the data we read back will be all
673         # zeros.
674         b = ss.remote_get_buckets("vid")
675         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
676         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
677
678     def test_advise_corruption(self):
679         workdir = self.workdir("test_advise_corruption")
680         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
681         ss.setServiceParent(self.sparent)
682
683         si0_s = base32.b2a("si0")
684         ss.remote_advise_corrupt_share("immutable", "si0", 0,
685                                        "This share smells funny.\n")
686         reportdir = os.path.join(workdir, "corruption-advisories")
687         reports = os.listdir(reportdir)
688         self.failUnlessEqual(len(reports), 1)
689         report_si0 = reports[0]
690         self.failUnlessIn(si0_s, report_si0)
691         f = open(os.path.join(reportdir, report_si0), "r")
692         report = f.read()
693         f.close()
694         self.failUnlessIn("type: immutable", report)
695         self.failUnlessIn("storage_index: %s" % si0_s, report)
696         self.failUnlessIn("share_number: 0", report)
697         self.failUnlessIn("This share smells funny.", report)
698
699         # test the RIBucketWriter version too
700         si1_s = base32.b2a("si1")
701         already,writers = self.allocate(ss, "si1", [1], 75)
702         self.failUnlessEqual(already, set())
703         self.failUnlessEqual(set(writers.keys()), set([1]))
704         writers[1].remote_write(0, "data")
705         writers[1].remote_close()
706
707         b = ss.remote_get_buckets("si1")
708         self.failUnlessEqual(set(b.keys()), set([1]))
709         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
710
711         reports = os.listdir(reportdir)
712         self.failUnlessEqual(len(reports), 2)
713         report_si1 = [r for r in reports if si1_s in r][0]
714         f = open(os.path.join(reportdir, report_si1), "r")
715         report = f.read()
716         f.close()
717         self.failUnlessIn("type: immutable", report)
718         self.failUnlessIn("storage_index: %s" % si1_s, report)
719         self.failUnlessIn("share_number: 1", report)
720         self.failUnlessIn("This share tastes like dust.", report)
721
722
723
724 class MutableServer(unittest.TestCase):
725
726     def setUp(self):
727         self.sparent = LoggingServiceParent()
728         self._lease_secret = itertools.count()
729     def tearDown(self):
730         return self.sparent.stopService()
731
732     def workdir(self, name):
733         basedir = os.path.join("storage", "MutableServer", name)
734         return basedir
735
736     def create(self, name):
737         workdir = self.workdir(name)
738         ss = StorageServer(workdir, "\x00" * 20)
739         ss.setServiceParent(self.sparent)
740         return ss
741
742     def test_create(self):
743         self.create("test_create")
744
745     def write_enabler(self, we_tag):
746         return hashutil.tagged_hash("we_blah", we_tag)
747
748     def renew_secret(self, tag):
749         return hashutil.tagged_hash("renew_blah", str(tag))
750
751     def cancel_secret(self, tag):
752         return hashutil.tagged_hash("cancel_blah", str(tag))
753
754     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
755         write_enabler = self.write_enabler(we_tag)
756         renew_secret = self.renew_secret(lease_tag)
757         cancel_secret = self.cancel_secret(lease_tag)
758         rstaraw = ss.remote_slot_testv_and_readv_and_writev
759         testandwritev = dict( [ (shnum, ([], [], None) )
760                          for shnum in sharenums ] )
761         readv = []
762         rc = rstaraw(storage_index,
763                      (write_enabler, renew_secret, cancel_secret),
764                      testandwritev,
765                      readv)
766         (did_write, readv_data) = rc
767         self.failUnless(did_write)
768         self.failUnless(isinstance(readv_data, dict))
769         self.failUnlessEqual(len(readv_data), 0)
770
771     def test_bad_magic(self):
772         ss = self.create("test_bad_magic")
773         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
774         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
775         f = open(fn, "rb+")
776         f.seek(0)
777         f.write("BAD MAGIC")
778         f.close()
779         read = ss.remote_slot_readv
780         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
781                                   read, "si1", [0], [(0,10)])
782         self.failUnlessIn(" had magic ", str(e))
783         self.failUnlessIn(" but we wanted ", str(e))
784
785     def test_container_size(self):
786         ss = self.create("test_container_size")
787         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
788                       set([0,1,2]), 100)
789         read = ss.remote_slot_readv
790         rstaraw = ss.remote_slot_testv_and_readv_and_writev
791         secrets = ( self.write_enabler("we1"),
792                     self.renew_secret("we1"),
793                     self.cancel_secret("we1") )
794         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
795         answer = rstaraw("si1", secrets,
796                          {0: ([], [(0,data)], len(data)+12)},
797                          [])
798         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
799
800         # trying to make the container too large will raise an exception
801         TOOBIG = MutableShareFile.MAX_SIZE + 10
802         self.failUnlessRaises(DataTooLargeError,
803                               rstaraw, "si1", secrets,
804                               {0: ([], [(0,data)], TOOBIG)},
805                               [])
806
807         # it should be possible to make the container smaller, although at
808         # the moment this doesn't actually affect the share, unless the
809         # container size is dropped to zero, in which case the share is
810         # deleted.
811         answer = rstaraw("si1", secrets,
812                          {0: ([], [(0,data)], len(data)+8)},
813                          [])
814         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
815
816         answer = rstaraw("si1", secrets,
817                          {0: ([], [(0,data)], 0)},
818                          [])
819         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
820
821         read_answer = read("si1", [0], [(0,10)])
822         self.failUnlessEqual(read_answer, {})
823
824     def test_allocate(self):
825         ss = self.create("test_allocate")
826         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
827                       set([0,1,2]), 100)
828
829         read = ss.remote_slot_readv
830         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
831                              {0: [""]})
832         self.failUnlessEqual(read("si1", [], [(0, 10)]),
833                              {0: [""], 1: [""], 2: [""]})
834         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
835                              {0: [""]})
836
837         # try writing to one
838         secrets = ( self.write_enabler("we1"),
839                     self.renew_secret("we1"),
840                     self.cancel_secret("we1") )
841         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
842         write = ss.remote_slot_testv_and_readv_and_writev
843         answer = write("si1", secrets,
844                        {0: ([], [(0,data)], None)},
845                        [])
846         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
847
848         self.failUnlessEqual(read("si1", [0], [(0,20)]),
849                              {0: ["00000000001111111111"]})
850         self.failUnlessEqual(read("si1", [0], [(95,10)]),
851                              {0: ["99999"]})
852         #self.failUnlessEqual(s0.remote_get_length(), 100)
853
854         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
855         f = self.failUnlessRaises(BadWriteEnablerError,
856                                   write, "si1", bad_secrets,
857                                   {}, [])
858         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
859
860         # this testv should fail
861         answer = write("si1", secrets,
862                        {0: ([(0, 12, "eq", "444444444444"),
863                              (20, 5, "eq", "22222"),
864                              ],
865                             [(0, "x"*100)],
866                             None),
867                         },
868                        [(0,12), (20,5)],
869                        )
870         self.failUnlessEqual(answer, (False,
871                                       {0: ["000000000011", "22222"],
872                                        1: ["", ""],
873                                        2: ["", ""],
874                                        }))
875         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
876
877         # as should this one
878         answer = write("si1", secrets,
879                        {0: ([(10, 5, "lt", "11111"),
880                              ],
881                             [(0, "x"*100)],
882                             None),
883                         },
884                        [(10,5)],
885                        )
886         self.failUnlessEqual(answer, (False,
887                                       {0: ["11111"],
888                                        1: [""],
889                                        2: [""]},
890                                       ))
891         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
892
893
894     def test_operators(self):
895         # test operators, the data we're comparing is '11111' in all cases.
896         # test both fail+pass, reset data after each one.
897         ss = self.create("test_operators")
898
899         secrets = ( self.write_enabler("we1"),
900                     self.renew_secret("we1"),
901                     self.cancel_secret("we1") )
902         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
903         write = ss.remote_slot_testv_and_readv_and_writev
904         read = ss.remote_slot_readv
905
906         def reset():
907             write("si1", secrets,
908                   {0: ([], [(0,data)], None)},
909                   [])
910
911         reset()
912
913         #  lt
914         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
915                                              ],
916                                             [(0, "x"*100)],
917                                             None,
918                                             )}, [(10,5)])
919         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
920         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
921         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
922         reset()
923
924         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
925                                              ],
926                                             [(0, "x"*100)],
927                                             None,
928                                             )}, [(10,5)])
929         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
930         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
931         reset()
932
933         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
934                                              ],
935                                             [(0, "y"*100)],
936                                             None,
937                                             )}, [(10,5)])
938         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
939         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
940         reset()
941
942         #  le
943         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
944                                              ],
945                                             [(0, "x"*100)],
946                                             None,
947                                             )}, [(10,5)])
948         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
949         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
950         reset()
951
952         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
953                                              ],
954                                             [(0, "y"*100)],
955                                             None,
956                                             )}, [(10,5)])
957         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
959         reset()
960
961         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
962                                              ],
963                                             [(0, "y"*100)],
964                                             None,
965                                             )}, [(10,5)])
966         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
967         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
968         reset()
969
970         #  eq
971         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
972                                              ],
973                                             [(0, "x"*100)],
974                                             None,
975                                             )}, [(10,5)])
976         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
977         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
978         reset()
979
980         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
981                                              ],
982                                             [(0, "y"*100)],
983                                             None,
984                                             )}, [(10,5)])
985         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
986         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
987         reset()
988
989         #  ne
990         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
991                                              ],
992                                             [(0, "x"*100)],
993                                             None,
994                                             )}, [(10,5)])
995         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
996         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
997         reset()
998
999         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1000                                              ],
1001                                             [(0, "y"*100)],
1002                                             None,
1003                                             )}, [(10,5)])
1004         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1005         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1006         reset()
1007
1008         #  ge
1009         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1010                                              ],
1011                                             [(0, "y"*100)],
1012                                             None,
1013                                             )}, [(10,5)])
1014         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1016         reset()
1017
1018         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1019                                              ],
1020                                             [(0, "y"*100)],
1021                                             None,
1022                                             )}, [(10,5)])
1023         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1024         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1025         reset()
1026
1027         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1028                                              ],
1029                                             [(0, "y"*100)],
1030                                             None,
1031                                             )}, [(10,5)])
1032         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1033         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1034         reset()
1035
1036         #  gt
1037         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1038                                              ],
1039                                             [(0, "y"*100)],
1040                                             None,
1041                                             )}, [(10,5)])
1042         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1043         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1044         reset()
1045
1046         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1047                                              ],
1048                                             [(0, "x"*100)],
1049                                             None,
1050                                             )}, [(10,5)])
1051         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1053         reset()
1054
1055         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1056                                              ],
1057                                             [(0, "x"*100)],
1058                                             None,
1059                                             )}, [(10,5)])
1060         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1061         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1062         reset()
1063
1064         # finally, test some operators against empty shares
1065         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1066                                              ],
1067                                             [(0, "x"*100)],
1068                                             None,
1069                                             )}, [(10,5)])
1070         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1071         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1072         reset()
1073
1074     def test_readv(self):
1075         ss = self.create("test_readv")
1076         secrets = ( self.write_enabler("we1"),
1077                     self.renew_secret("we1"),
1078                     self.cancel_secret("we1") )
1079         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1080         write = ss.remote_slot_testv_and_readv_and_writev
1081         read = ss.remote_slot_readv
1082         data = [("%d" % i) * 100 for i in range(3)]
1083         rc = write("si1", secrets,
1084                    {0: ([], [(0,data[0])], None),
1085                     1: ([], [(0,data[1])], None),
1086                     2: ([], [(0,data[2])], None),
1087                     }, [])
1088         self.failUnlessEqual(rc, (True, {}))
1089
1090         answer = read("si1", [], [(0, 10)])
1091         self.failUnlessEqual(answer, {0: ["0"*10],
1092                                       1: ["1"*10],
1093                                       2: ["2"*10]})
1094
1095     def compare_leases_without_timestamps(self, leases_a, leases_b):
1096         self.failUnlessEqual(len(leases_a), len(leases_b))
1097         for i in range(len(leases_a)):
1098             a = leases_a[i]
1099             b = leases_b[i]
1100             self.failUnlessEqual(a.owner_num,       b.owner_num)
1101             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1102             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1103             self.failUnlessEqual(a.nodeid,          b.nodeid)
1104
1105     def compare_leases(self, leases_a, leases_b):
1106         self.failUnlessEqual(len(leases_a), len(leases_b))
1107         for i in range(len(leases_a)):
1108             a = leases_a[i]
1109             b = leases_b[i]
1110             self.failUnlessEqual(a.owner_num,       b.owner_num)
1111             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1112             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1113             self.failUnlessEqual(a.nodeid,          b.nodeid)
1114             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1115
1116     def test_leases(self):
1117         ss = self.create("test_leases")
1118         def secrets(n):
1119             return ( self.write_enabler("we1"),
1120                      self.renew_secret("we1-%d" % n),
1121                      self.cancel_secret("we1-%d" % n) )
1122         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1123         write = ss.remote_slot_testv_and_readv_and_writev
1124         read = ss.remote_slot_readv
1125         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1126         self.failUnlessEqual(rc, (True, {}))
1127
1128         # create a random non-numeric file in the bucket directory, to
1129         # exercise the code that's supposed to ignore those.
1130         bucket_dir = os.path.join(self.workdir("test_leases"),
1131                                   "shares", storage_index_to_dir("si1"))
1132         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1133         f.write("you ought to be ignoring me\n")
1134         f.close()
1135
1136         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1137         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1138
1139         # add-lease on a missing storage index is silently ignored
1140         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1141
1142         # re-allocate the slots and use the same secrets, that should update
1143         # the lease
1144         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1145         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1146
1147         # renew it directly
1148         ss.remote_renew_lease("si1", secrets(0)[1])
1149         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1150
1151         # now allocate them with a bunch of different secrets, to trigger the
1152         # extended lease code. Use add_lease for one of them.
1153         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1154         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1155         secrets2 = secrets(2)
1156         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1157         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1158         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1159         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1160         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1161
1162         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1163
1164         # cancel one of them
1165         ss.remote_cancel_lease("si1", secrets(5)[2])
1166         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1167
1168         all_leases = list(s0.get_leases())
1169         # and write enough data to expand the container, forcing the server
1170         # to move the leases
1171         write("si1", secrets(0),
1172               {0: ([], [(0,data)], 200), },
1173               [])
1174
1175         # read back the leases, make sure they're still intact.
1176         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1177
1178         ss.remote_renew_lease("si1", secrets(0)[1])
1179         ss.remote_renew_lease("si1", secrets(1)[1])
1180         ss.remote_renew_lease("si1", secrets(2)[1])
1181         ss.remote_renew_lease("si1", secrets(3)[1])
1182         ss.remote_renew_lease("si1", secrets(4)[1])
1183         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1184         # get a new copy of the leases, with the current timestamps. Reading
1185         # data and failing to renew/cancel leases should leave the timestamps
1186         # alone.
1187         all_leases = list(s0.get_leases())
1188         # renewing with a bogus token should prompt an error message
1189
1190         # examine the exception thus raised, make sure the old nodeid is
1191         # present, to provide for share migration
1192         e = self.failUnlessRaises(IndexError,
1193                                   ss.remote_renew_lease, "si1",
1194                                   secrets(20)[1])
1195         e_s = str(e)
1196         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1197         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1198         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1199
1200         # same for cancelling
1201         self.failUnlessRaises(IndexError,
1202                               ss.remote_cancel_lease, "si1",
1203                               secrets(20)[2])
1204         self.compare_leases(all_leases, list(s0.get_leases()))
1205
1206         # reading shares should not modify the timestamp
1207         read("si1", [], [(0,200)])
1208         self.compare_leases(all_leases, list(s0.get_leases()))
1209
1210         write("si1", secrets(0),
1211               {0: ([], [(200, "make me bigger")], None)}, [])
1212         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1213
1214         write("si1", secrets(0),
1215               {0: ([], [(500, "make me really bigger")], None)}, [])
1216         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1217
1218         # now cancel them all
1219         ss.remote_cancel_lease("si1", secrets(0)[2])
1220         ss.remote_cancel_lease("si1", secrets(1)[2])
1221         ss.remote_cancel_lease("si1", secrets(2)[2])
1222         ss.remote_cancel_lease("si1", secrets(3)[2])
1223
1224         # the slot should still be there
1225         remaining_shares = read("si1", [], [(0,10)])
1226         self.failUnlessEqual(len(remaining_shares), 1)
1227         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1228
1229         # cancelling a non-existent lease should raise an IndexError
1230         self.failUnlessRaises(IndexError,
1231                               ss.remote_cancel_lease, "si1", "nonsecret")
1232
1233         # and the slot should still be there
1234         remaining_shares = read("si1", [], [(0,10)])
1235         self.failUnlessEqual(len(remaining_shares), 1)
1236         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1237
1238         ss.remote_cancel_lease("si1", secrets(4)[2])
1239         # now the slot should be gone
1240         no_shares = read("si1", [], [(0,10)])
1241         self.failUnlessEqual(no_shares, {})
1242
1243         # cancelling a lease on a non-existent share should raise an IndexError
1244         self.failUnlessRaises(IndexError,
1245                               ss.remote_cancel_lease, "si2", "nonsecret")
1246
1247     def test_remove(self):
1248         ss = self.create("test_remove")
1249         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1250                       set([0,1,2]), 100)
1251         readv = ss.remote_slot_readv
1252         writev = ss.remote_slot_testv_and_readv_and_writev
1253         secrets = ( self.write_enabler("we1"),
1254                     self.renew_secret("we1"),
1255                     self.cancel_secret("we1") )
1256         # delete sh0 by setting its size to zero
1257         answer = writev("si1", secrets,
1258                         {0: ([], [], 0)},
1259                         [])
1260         # the answer should mention all the shares that existed before the
1261         # write
1262         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1263         # but a new read should show only sh1 and sh2
1264         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1265                              {1: [""], 2: [""]})
1266
1267         # delete sh1 by setting its size to zero
1268         answer = writev("si1", secrets,
1269                         {1: ([], [], 0)},
1270                         [])
1271         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1272         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1273                              {2: [""]})
1274
1275         # delete sh2 by setting its size to zero
1276         answer = writev("si1", secrets,
1277                         {2: ([], [], 0)},
1278                         [])
1279         self.failUnlessEqual(answer, (True, {2:[]}) )
1280         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1281                              {})
1282         # and the bucket directory should now be gone
1283         si = base32.b2a("si1")
1284         # note: this is a detail of the storage server implementation, and
1285         # may change in the future
1286         prefix = si[:2]
1287         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1288         bucketdir = os.path.join(prefixdir, si)
1289         self.failUnless(os.path.exists(prefixdir), prefixdir)
1290         self.failIf(os.path.exists(bucketdir), bucketdir)
1291
1292 class Stats(unittest.TestCase):
1293
1294     def setUp(self):
1295         self.sparent = LoggingServiceParent()
1296         self._lease_secret = itertools.count()
1297     def tearDown(self):
1298         return self.sparent.stopService()
1299
1300     def workdir(self, name):
1301         basedir = os.path.join("storage", "Server", name)
1302         return basedir
1303
1304     def create(self, name):
1305         workdir = self.workdir(name)
1306         ss = StorageServer(workdir, "\x00" * 20)
1307         ss.setServiceParent(self.sparent)
1308         return ss
1309
1310     def test_latencies(self):
1311         ss = self.create("test_latencies")
1312         for i in range(10000):
1313             ss.add_latency("allocate", 1.0 * i)
1314         for i in range(1000):
1315             ss.add_latency("renew", 1.0 * i)
1316         for i in range(20):
1317             ss.add_latency("write", 1.0 * i)
1318         for i in range(10):
1319             ss.add_latency("cancel", 2.0 * i)
1320         ss.add_latency("get", 5.0)
1321
1322         output = ss.get_latencies()
1323
1324         self.failUnlessEqual(sorted(output.keys()),
1325                              sorted(["allocate", "renew", "cancel", "write", "get"]))
1326         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1327         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1328         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1329         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1330         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1331         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1332         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1333         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1334         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1335
1336         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1337         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1338         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
1339         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1340         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1341         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1342         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1343         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1344         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1345
1346         self.failUnlessEqual(len(ss.latencies["write"]), 20)
1347         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
1348         self.failUnless(output["write"]["01_0_percentile"] is None, output)
1349         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
1350         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
1351         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
1352         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
1353         self.failUnless(output["write"]["99_0_percentile"] is None, output)
1354         self.failUnless(output["write"]["99_9_percentile"] is None, output)
1355
1356         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1357         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1358         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
1359         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
1360         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1361         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1362         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
1363         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
1364         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
1365
1366         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1367         self.failUnless(output["get"]["mean"] is None, output)
1368         self.failUnless(output["get"]["01_0_percentile"] is None, output)
1369         self.failUnless(output["get"]["10_0_percentile"] is None, output)
1370         self.failUnless(output["get"]["50_0_percentile"] is None, output)
1371         self.failUnless(output["get"]["90_0_percentile"] is None, output)
1372         self.failUnless(output["get"]["95_0_percentile"] is None, output)
1373         self.failUnless(output["get"]["99_0_percentile"] is None, output)
1374         self.failUnless(output["get"]["99_9_percentile"] is None, output)
1375
1376 def remove_tags(s):
1377     s = re.sub(r'<[^>]*>', ' ', s)
1378     s = re.sub(r'\s+', ' ', s)
1379     return s
1380
1381 class MyBucketCountingCrawler(BucketCountingCrawler):
1382     def finished_prefix(self, cycle, prefix):
1383         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1384         if self.hook_ds:
1385             d = self.hook_ds.pop(0)
1386             d.callback(None)
1387
1388 class MyStorageServer(StorageServer):
1389     def add_bucket_counter(self):
1390         statefile = os.path.join(self.storedir, "bucket_counter.state")
1391         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1392         self.bucket_counter.setServiceParent(self)
1393
1394 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1395
1396     def setUp(self):
1397         self.s = service.MultiService()
1398         self.s.startService()
1399     def tearDown(self):
1400         return self.s.stopService()
1401
1402     def test_bucket_counter(self):
1403         basedir = "storage/BucketCounter/bucket_counter"
1404         fileutil.make_dirs(basedir)
1405         ss = StorageServer(basedir, "\x00" * 20)
1406         # to make sure we capture the bucket-counting-crawler in the middle
1407         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1408         # also make it start sooner than usual.
1409         ss.bucket_counter.slow_start = 0
1410         orig_cpu_slice = ss.bucket_counter.cpu_slice
1411         ss.bucket_counter.cpu_slice = 0
1412         ss.setServiceParent(self.s)
1413
1414         w = StorageStatus(ss)
1415
1416         # this sample is before the crawler has started doing anything
1417         html = w.renderSynchronously()
1418         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1419         s = remove_tags(html)
1420         self.failUnlessIn("Accepting new shares: Yes", s)
1421         self.failUnlessIn("Reserved space: - 0 B (0)", s)
1422         self.failUnlessIn("Total buckets: Not computed yet", s)
1423         self.failUnlessIn("Next crawl in", s)
1424
1425         # give the bucket-counting-crawler one tick to get started. The
1426         # cpu_slice=0 will force it to yield right after it processes the
1427         # first prefix
1428
1429         d = fireEventually()
1430         def _check(ignored):
1431             # are we really right after the first prefix?
1432             state = ss.bucket_counter.get_state()
1433             if state["last-complete-prefix"] is None:
1434                 d2 = fireEventually()
1435                 d2.addCallback(_check)
1436                 return d2
1437             self.failUnlessEqual(state["last-complete-prefix"],
1438                                  ss.bucket_counter.prefixes[0])
1439             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1440             html = w.renderSynchronously()
1441             s = remove_tags(html)
1442             self.failUnlessIn(" Current crawl ", s)
1443             self.failUnlessIn(" (next work in ", s)
1444         d.addCallback(_check)
1445
1446         # now give it enough time to complete a full cycle
1447         def _watch():
1448             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1449         d.addCallback(lambda ignored: self.poll(_watch))
1450         def _check2(ignored):
1451             ss.bucket_counter.cpu_slice = orig_cpu_slice
1452             html = w.renderSynchronously()
1453             s = remove_tags(html)
1454             self.failUnlessIn("Total buckets: 0 (the number of", s)
1455             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1456         d.addCallback(_check2)
1457         return d
1458
1459     def test_bucket_counter_cleanup(self):
1460         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1461         fileutil.make_dirs(basedir)
1462         ss = StorageServer(basedir, "\x00" * 20)
1463         # to make sure we capture the bucket-counting-crawler in the middle
1464         # of a cycle, we reach in and reduce its maximum slice time to 0.
1465         ss.bucket_counter.slow_start = 0
1466         orig_cpu_slice = ss.bucket_counter.cpu_slice
1467         ss.bucket_counter.cpu_slice = 0
1468         ss.setServiceParent(self.s)
1469
1470         d = fireEventually()
1471
1472         def _after_first_prefix(ignored):
1473             state = ss.bucket_counter.state
1474             if state["last-complete-prefix"] is None:
1475                 d2 = fireEventually()
1476                 d2.addCallback(_after_first_prefix)
1477                 return d2
1478             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1479             # now sneak in and mess with its state, to make sure it cleans up
1480             # properly at the end of the cycle
1481             self.failUnlessEqual(state["last-complete-prefix"],
1482                                  ss.bucket_counter.prefixes[0])
1483             state["bucket-counts"][-12] = {}
1484             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1485             ss.bucket_counter.save_state()
1486         d.addCallback(_after_first_prefix)
1487
1488         # now give it enough time to complete a cycle
1489         def _watch():
1490             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1491         d.addCallback(lambda ignored: self.poll(_watch))
1492         def _check2(ignored):
1493             ss.bucket_counter.cpu_slice = orig_cpu_slice
1494             s = ss.bucket_counter.get_state()
1495             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1496             self.failIf("bogusprefix!" in s["storage-index-samples"],
1497                         s["storage-index-samples"].keys())
1498         d.addCallback(_check2)
1499         return d
1500
1501     def test_bucket_counter_eta(self):
1502         basedir = "storage/BucketCounter/bucket_counter_eta"
1503         fileutil.make_dirs(basedir)
1504         ss = MyStorageServer(basedir, "\x00" * 20)
1505         ss.bucket_counter.slow_start = 0
1506         # these will be fired inside finished_prefix()
1507         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1508         w = StorageStatus(ss)
1509
1510         d = defer.Deferred()
1511
1512         def _check_1(ignored):
1513             # no ETA is available yet
1514             html = w.renderSynchronously()
1515             s = remove_tags(html)
1516             self.failUnlessIn("complete (next work", s)
1517
1518         def _check_2(ignored):
1519             # one prefix has finished, so an ETA based upon that elapsed time
1520             # should be available.
1521             html = w.renderSynchronously()
1522             s = remove_tags(html)
1523             self.failUnlessIn("complete (ETA ", s)
1524
1525         def _check_3(ignored):
1526             # two prefixes have finished
1527             html = w.renderSynchronously()
1528             s = remove_tags(html)
1529             self.failUnlessIn("complete (ETA ", s)
1530             d.callback("done")
1531
1532         hooks[0].addCallback(_check_1).addErrback(d.errback)
1533         hooks[1].addCallback(_check_2).addErrback(d.errback)
1534         hooks[2].addCallback(_check_3).addErrback(d.errback)
1535
1536         ss.setServiceParent(self.s)
1537         return d
1538
1539 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1540     stop_after_first_bucket = False
1541     def process_bucket(self, *args, **kwargs):
1542         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1543         if self.stop_after_first_bucket:
1544             self.stop_after_first_bucket = False
1545             self.cpu_slice = -1.0
1546     def yielding(self, sleep_time):
1547         if not self.stop_after_first_bucket:
1548             self.cpu_slice = 500
1549
1550 class BrokenStatResults:
1551     pass
1552 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1553     def stat(self, fn):
1554         s = os.stat(fn)
1555         bsr = BrokenStatResults()
1556         for attrname in dir(s):
1557             if attrname.startswith("_"):
1558                 continue
1559             if attrname == "st_blocks":
1560                 continue
1561             setattr(bsr, attrname, getattr(s, attrname))
1562         return bsr
1563
1564 class InstrumentedStorageServer(StorageServer):
1565     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1566 class No_ST_BLOCKS_StorageServer(StorageServer):
1567     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1568
1569 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1570
1571     def setUp(self):
1572         self.s = service.MultiService()
1573         self.s.startService()
1574     def tearDown(self):
1575         return self.s.stopService()
1576
1577     def make_shares(self, ss):
1578         def make(si):
1579             return (si, hashutil.tagged_hash("renew", si),
1580                     hashutil.tagged_hash("cancel", si))
1581         def make_mutable(si):
1582             return (si, hashutil.tagged_hash("renew", si),
1583                     hashutil.tagged_hash("cancel", si),
1584                     hashutil.tagged_hash("write-enabler", si))
1585         def make_extra_lease(si, num):
1586             return (hashutil.tagged_hash("renew-%d" % num, si),
1587                     hashutil.tagged_hash("cancel-%d" % num, si))
1588
1589         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1590         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1591         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1592         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1593         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1594         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1595         sharenums = [0]
1596         canary = FakeCanary()
1597         # note: 'tahoe debug dump-share' will not handle this file, since the
1598         # inner contents are not a valid CHK share
1599         data = "\xff" * 1000
1600
1601         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1602                                          1000, canary)
1603         w[0].remote_write(0, data)
1604         w[0].remote_close()
1605
1606         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1607                                          1000, canary)
1608         w[0].remote_write(0, data)
1609         w[0].remote_close()
1610         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1611
1612         writev = ss.remote_slot_testv_and_readv_and_writev
1613         writev(mutable_si_2, (we2, rs2, cs2),
1614                {0: ([], [(0,data)], len(data))}, [])
1615         writev(mutable_si_3, (we3, rs3, cs3),
1616                {0: ([], [(0,data)], len(data))}, [])
1617         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1618
1619         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1620         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1621         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1622
1623     def test_basic(self):
1624         basedir = "storage/LeaseCrawler/basic"
1625         fileutil.make_dirs(basedir)
1626         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1627         # make it start sooner than usual.
1628         lc = ss.lease_checker
1629         lc.slow_start = 0
1630         lc.cpu_slice = 500
1631         lc.stop_after_first_bucket = True
1632         webstatus = StorageStatus(ss)
1633
1634         # create a few shares, with some leases on them
1635         self.make_shares(ss)
1636         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1637
1638         # add a non-sharefile to exercise another code path
1639         fn = os.path.join(ss.sharedir,
1640                           storage_index_to_dir(immutable_si_0),
1641                           "not-a-share")
1642         f = open(fn, "wb")
1643         f.write("I am not a share.\n")
1644         f.close()
1645
1646         # this is before the crawl has started, so we're not in a cycle yet
1647         initial_state = lc.get_state()
1648         self.failIf(lc.get_progress()["cycle-in-progress"])
1649         self.failIfIn("cycle-to-date", initial_state)
1650         self.failIfIn("estimated-remaining-cycle", initial_state)
1651         self.failIfIn("estimated-current-cycle", initial_state)
1652         self.failUnlessIn("history", initial_state)
1653         self.failUnlessEqual(initial_state["history"], {})
1654
1655         ss.setServiceParent(self.s)
1656
1657         DAY = 24*60*60
1658
1659         d = fireEventually()
1660
1661         # now examine the state right after the first bucket has been
1662         # processed.
1663         def _after_first_bucket(ignored):
1664             initial_state = lc.get_state()
1665             if "cycle-to-date" not in initial_state:
1666                 d2 = fireEventually()
1667                 d2.addCallback(_after_first_bucket)
1668                 return d2
1669             self.failUnlessIn("cycle-to-date", initial_state)
1670             self.failUnlessIn("estimated-remaining-cycle", initial_state)
1671             self.failUnlessIn("estimated-current-cycle", initial_state)
1672             self.failUnlessIn("history", initial_state)
1673             self.failUnlessEqual(initial_state["history"], {})
1674
1675             so_far = initial_state["cycle-to-date"]
1676             self.failUnlessEqual(so_far["expiration-enabled"], False)
1677             self.failUnlessIn("configured-expiration-mode", so_far)
1678             self.failUnlessIn("lease-age-histogram", so_far)
1679             lah = so_far["lease-age-histogram"]
1680             self.failUnlessEqual(type(lah), list)
1681             self.failUnlessEqual(len(lah), 1)
1682             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1683             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1684             self.failUnlessEqual(so_far["corrupt-shares"], [])
1685             sr1 = so_far["space-recovered"]
1686             self.failUnlessEqual(sr1["examined-buckets"], 1)
1687             self.failUnlessEqual(sr1["examined-shares"], 1)
1688             self.failUnlessEqual(sr1["actual-shares"], 0)
1689             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1690             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1691             left = initial_state["estimated-remaining-cycle"]
1692             sr2 = left["space-recovered"]
1693             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1694             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1695             self.failIfEqual(sr2["actual-shares"], None)
1696             self.failIfEqual(sr2["configured-diskbytes"], None)
1697             self.failIfEqual(sr2["original-sharebytes"], None)
1698         d.addCallback(_after_first_bucket)
1699         d.addCallback(lambda ign: self.render1(webstatus))
1700         def _check_html_in_cycle(html):
1701             s = remove_tags(html)
1702             self.failUnlessIn("So far, this cycle has examined "
1703                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1704             self.failUnlessIn("and has recovered: "
1705                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1706                               "0 B (0 B / 0 B)", s)
1707             self.failUnlessIn("If expiration were enabled, "
1708                               "we would have recovered: "
1709                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1710                               " 0 B (0 B / 0 B) by now", s)
1711             self.failUnlessIn("and the remainder of this cycle "
1712                               "would probably recover: "
1713                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1714                               " 0 B (0 B / 0 B)", s)
1715             self.failUnlessIn("and the whole cycle would probably recover: "
1716                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1717                               " 0 B (0 B / 0 B)", s)
1718             self.failUnlessIn("if we were strictly using each lease's default "
1719                               "31-day lease lifetime", s)
1720             self.failUnlessIn("this cycle would be expected to recover: ", s)
1721         d.addCallback(_check_html_in_cycle)
1722
1723         # wait for the crawler to finish the first cycle. Nothing should have
1724         # been removed.
1725         def _wait():
1726             return bool(lc.get_state()["last-cycle-finished"] is not None)
1727         d.addCallback(lambda ign: self.poll(_wait))
1728
1729         def _after_first_cycle(ignored):
1730             s = lc.get_state()
1731             self.failIf("cycle-to-date" in s)
1732             self.failIf("estimated-remaining-cycle" in s)
1733             self.failIf("estimated-current-cycle" in s)
1734             last = s["history"][0]
1735             self.failUnlessIn("cycle-start-finish-times", last)
1736             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1737             self.failUnlessEqual(last["expiration-enabled"], False)
1738             self.failUnlessIn("configured-expiration-mode", last)
1739
1740             self.failUnlessIn("lease-age-histogram", last)
1741             lah = last["lease-age-histogram"]
1742             self.failUnlessEqual(type(lah), list)
1743             self.failUnlessEqual(len(lah), 1)
1744             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1745
1746             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1747             self.failUnlessEqual(last["corrupt-shares"], [])
1748
1749             rec = last["space-recovered"]
1750             self.failUnlessEqual(rec["examined-buckets"], 4)
1751             self.failUnlessEqual(rec["examined-shares"], 4)
1752             self.failUnlessEqual(rec["actual-buckets"], 0)
1753             self.failUnlessEqual(rec["original-buckets"], 0)
1754             self.failUnlessEqual(rec["configured-buckets"], 0)
1755             self.failUnlessEqual(rec["actual-shares"], 0)
1756             self.failUnlessEqual(rec["original-shares"], 0)
1757             self.failUnlessEqual(rec["configured-shares"], 0)
1758             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1759             self.failUnlessEqual(rec["original-diskbytes"], 0)
1760             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1761             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1762             self.failUnlessEqual(rec["original-sharebytes"], 0)
1763             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1764
1765             def _get_sharefile(si):
1766                 return list(ss._iter_share_files(si))[0]
1767             def count_leases(si):
1768                 return len(list(_get_sharefile(si).get_leases()))
1769             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1770             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1771             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1772             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1773         d.addCallback(_after_first_cycle)
1774         d.addCallback(lambda ign: self.render1(webstatus))
1775         def _check_html(html):
1776             s = remove_tags(html)
1777             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1778                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1779             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1780                               "(2 mutable / 2 immutable),", s)
1781             self.failUnlessIn("but expiration was not enabled", s)
1782         d.addCallback(_check_html)
1783         d.addCallback(lambda ign: self.render_json(webstatus))
1784         def _check_json(json):
1785             data = simplejson.loads(json)
1786             self.failUnlessIn("lease-checker", data)
1787             self.failUnlessIn("lease-checker-progress", data)
1788         d.addCallback(_check_json)
1789         return d
1790
1791     def backdate_lease(self, sf, renew_secret, new_expire_time):
1792         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1793         # "renew" a lease with a new_expire_time that is older than what the
1794         # current lease has), so we have to reach inside it.
1795         for i,lease in enumerate(sf.get_leases()):
1796             if lease.renew_secret == renew_secret:
1797                 lease.expiration_time = new_expire_time
1798                 f = open(sf.home, 'rb+')
1799                 sf._write_lease_record(f, i, lease)
1800                 f.close()
1801                 return
1802         raise IndexError("unable to renew non-existent lease")
1803
1804     def test_expire_age(self):
1805         basedir = "storage/LeaseCrawler/expire_age"
1806         fileutil.make_dirs(basedir)
1807         # setting expiration_time to 2000 means that any lease which is more
1808         # than 2000s old will be expired.
1809         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1810                                        expiration_enabled=True,
1811                                        expiration_mode="age",
1812                                        expiration_override_lease_duration=2000)
1813         # make it start sooner than usual.
1814         lc = ss.lease_checker
1815         lc.slow_start = 0
1816         lc.stop_after_first_bucket = True
1817         webstatus = StorageStatus(ss)
1818
1819         # create a few shares, with some leases on them
1820         self.make_shares(ss)
1821         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1822
1823         def count_shares(si):
1824             return len(list(ss._iter_share_files(si)))
1825         def _get_sharefile(si):
1826             return list(ss._iter_share_files(si))[0]
1827         def count_leases(si):
1828             return len(list(_get_sharefile(si).get_leases()))
1829
1830         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1831         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1832         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1833         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1834         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1835         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1836         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1837         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1838
1839         # artificially crank back the expiration time on the first lease of
1840         # each share, to make it look like it expired already (age=1000s).
1841         # Some shares have an extra lease which is set to expire at the
1842         # default time in 31 days from now (age=31days). We then run the
1843         # crawler, which will expire the first lease, making some shares get
1844         # deleted and others stay alive (with one remaining lease)
1845         now = time.time()
1846
1847         sf0 = _get_sharefile(immutable_si_0)
1848         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1849         sf0_size = os.stat(sf0.home).st_size
1850
1851         # immutable_si_1 gets an extra lease
1852         sf1 = _get_sharefile(immutable_si_1)
1853         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1854
1855         sf2 = _get_sharefile(mutable_si_2)
1856         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1857         sf2_size = os.stat(sf2.home).st_size
1858
1859         # mutable_si_3 gets an extra lease
1860         sf3 = _get_sharefile(mutable_si_3)
1861         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1862
1863         ss.setServiceParent(self.s)
1864
1865         d = fireEventually()
1866         # examine the state right after the first bucket has been processed
1867         def _after_first_bucket(ignored):
1868             p = lc.get_progress()
1869             if not p["cycle-in-progress"]:
1870                 d2 = fireEventually()
1871                 d2.addCallback(_after_first_bucket)
1872                 return d2
1873         d.addCallback(_after_first_bucket)
1874         d.addCallback(lambda ign: self.render1(webstatus))
1875         def _check_html_in_cycle(html):
1876             s = remove_tags(html)
1877             # the first bucket encountered gets deleted, and its prefix
1878             # happens to be about 1/5th of the way through the ring, so the
1879             # predictor thinks we'll have 5 shares and that we'll delete them
1880             # all. This part of the test depends upon the SIs landing right
1881             # where they do now.
1882             self.failUnlessIn("The remainder of this cycle is expected to "
1883                               "recover: 4 shares, 4 buckets", s)
1884             self.failUnlessIn("The whole cycle is expected to examine "
1885                               "5 shares in 5 buckets and to recover: "
1886                               "5 shares, 5 buckets", s)
1887         d.addCallback(_check_html_in_cycle)
1888
1889         # wait for the crawler to finish the first cycle. Two shares should
1890         # have been removed
1891         def _wait():
1892             return bool(lc.get_state()["last-cycle-finished"] is not None)
1893         d.addCallback(lambda ign: self.poll(_wait))
1894
1895         def _after_first_cycle(ignored):
1896             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1897             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1898             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1899             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1900             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1901             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1902
1903             s = lc.get_state()
1904             last = s["history"][0]
1905
1906             self.failUnlessEqual(last["expiration-enabled"], True)
1907             self.failUnlessEqual(last["configured-expiration-mode"],
1908                                  ("age", 2000, None, ("mutable", "immutable")))
1909             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1910
1911             rec = last["space-recovered"]
1912             self.failUnlessEqual(rec["examined-buckets"], 4)
1913             self.failUnlessEqual(rec["examined-shares"], 4)
1914             self.failUnlessEqual(rec["actual-buckets"], 2)
1915             self.failUnlessEqual(rec["original-buckets"], 2)
1916             self.failUnlessEqual(rec["configured-buckets"], 2)
1917             self.failUnlessEqual(rec["actual-shares"], 2)
1918             self.failUnlessEqual(rec["original-shares"], 2)
1919             self.failUnlessEqual(rec["configured-shares"], 2)
1920             size = sf0_size + sf2_size
1921             self.failUnlessEqual(rec["actual-sharebytes"], size)
1922             self.failUnlessEqual(rec["original-sharebytes"], size)
1923             self.failUnlessEqual(rec["configured-sharebytes"], size)
1924             # different platforms have different notions of "blocks used by
1925             # this file", so merely assert that it's a number
1926             self.failUnless(rec["actual-diskbytes"] >= 0,
1927                             rec["actual-diskbytes"])
1928             self.failUnless(rec["original-diskbytes"] >= 0,
1929                             rec["original-diskbytes"])
1930             self.failUnless(rec["configured-diskbytes"] >= 0,
1931                             rec["configured-diskbytes"])
1932         d.addCallback(_after_first_cycle)
1933         d.addCallback(lambda ign: self.render1(webstatus))
1934         def _check_html(html):
1935             s = remove_tags(html)
1936             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1937             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1938             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1939         d.addCallback(_check_html)
1940         return d
1941
1942     def test_expire_cutoff_date(self):
1943         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1944         fileutil.make_dirs(basedir)
1945         # setting cutoff-date to 2000 seconds ago means that any lease which
1946         # is more than 2000s old will be expired.
1947         now = time.time()
1948         then = int(now - 2000)
1949         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1950                                        expiration_enabled=True,
1951                                        expiration_mode="cutoff-date",
1952                                        expiration_cutoff_date=then)
1953         # make it start sooner than usual.
1954         lc = ss.lease_checker
1955         lc.slow_start = 0
1956         lc.stop_after_first_bucket = True
1957         webstatus = StorageStatus(ss)
1958
1959         # create a few shares, with some leases on them
1960         self.make_shares(ss)
1961         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1962
1963         def count_shares(si):
1964             return len(list(ss._iter_share_files(si)))
1965         def _get_sharefile(si):
1966             return list(ss._iter_share_files(si))[0]
1967         def count_leases(si):
1968             return len(list(_get_sharefile(si).get_leases()))
1969
1970         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1971         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1972         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1973         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1974         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1975         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1976         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1977         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1978
1979         # artificially crank back the expiration time on the first lease of
1980         # each share, to make it look like was renewed 3000s ago. To achieve
1981         # this, we need to set the expiration time to now-3000+31days. This
1982         # will change when the lease format is improved to contain both
1983         # create/renew time and duration.
1984         new_expiration_time = now - 3000 + 31*24*60*60
1985
1986         # Some shares have an extra lease which is set to expire at the
1987         # default time in 31 days from now (age=31days). We then run the
1988         # crawler, which will expire the first lease, making some shares get
1989         # deleted and others stay alive (with one remaining lease)
1990
1991         sf0 = _get_sharefile(immutable_si_0)
1992         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1993         sf0_size = os.stat(sf0.home).st_size
1994
1995         # immutable_si_1 gets an extra lease
1996         sf1 = _get_sharefile(immutable_si_1)
1997         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1998
1999         sf2 = _get_sharefile(mutable_si_2)
2000         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2001         sf2_size = os.stat(sf2.home).st_size
2002
2003         # mutable_si_3 gets an extra lease
2004         sf3 = _get_sharefile(mutable_si_3)
2005         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2006
2007         ss.setServiceParent(self.s)
2008
2009         d = fireEventually()
2010         # examine the state right after the first bucket has been processed
2011         def _after_first_bucket(ignored):
2012             p = lc.get_progress()
2013             if not p["cycle-in-progress"]:
2014                 d2 = fireEventually()
2015                 d2.addCallback(_after_first_bucket)
2016                 return d2
2017         d.addCallback(_after_first_bucket)
2018         d.addCallback(lambda ign: self.render1(webstatus))
2019         def _check_html_in_cycle(html):
2020             s = remove_tags(html)
2021             # the first bucket encountered gets deleted, and its prefix
2022             # happens to be about 1/5th of the way through the ring, so the
2023             # predictor thinks we'll have 5 shares and that we'll delete them
2024             # all. This part of the test depends upon the SIs landing right
2025             # where they do now.
2026             self.failUnlessIn("The remainder of this cycle is expected to "
2027                               "recover: 4 shares, 4 buckets", s)
2028             self.failUnlessIn("The whole cycle is expected to examine "
2029                               "5 shares in 5 buckets and to recover: "
2030                               "5 shares, 5 buckets", s)
2031         d.addCallback(_check_html_in_cycle)
2032
2033         # wait for the crawler to finish the first cycle. Two shares should
2034         # have been removed
2035         def _wait():
2036             return bool(lc.get_state()["last-cycle-finished"] is not None)
2037         d.addCallback(lambda ign: self.poll(_wait))
2038
2039         def _after_first_cycle(ignored):
2040             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2041             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2042             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2043             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2044             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2045             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2046
2047             s = lc.get_state()
2048             last = s["history"][0]
2049
2050             self.failUnlessEqual(last["expiration-enabled"], True)
2051             self.failUnlessEqual(last["configured-expiration-mode"],
2052                                  ("cutoff-date", None, then,
2053                                   ("mutable", "immutable")))
2054             self.failUnlessEqual(last["leases-per-share-histogram"],
2055                                  {1: 2, 2: 2})
2056
2057             rec = last["space-recovered"]
2058             self.failUnlessEqual(rec["examined-buckets"], 4)
2059             self.failUnlessEqual(rec["examined-shares"], 4)
2060             self.failUnlessEqual(rec["actual-buckets"], 2)
2061             self.failUnlessEqual(rec["original-buckets"], 0)
2062             self.failUnlessEqual(rec["configured-buckets"], 2)
2063             self.failUnlessEqual(rec["actual-shares"], 2)
2064             self.failUnlessEqual(rec["original-shares"], 0)
2065             self.failUnlessEqual(rec["configured-shares"], 2)
2066             size = sf0_size + sf2_size
2067             self.failUnlessEqual(rec["actual-sharebytes"], size)
2068             self.failUnlessEqual(rec["original-sharebytes"], 0)
2069             self.failUnlessEqual(rec["configured-sharebytes"], size)
2070             # different platforms have different notions of "blocks used by
2071             # this file", so merely assert that it's a number
2072             self.failUnless(rec["actual-diskbytes"] >= 0,
2073                             rec["actual-diskbytes"])
2074             self.failUnless(rec["original-diskbytes"] >= 0,
2075                             rec["original-diskbytes"])
2076             self.failUnless(rec["configured-diskbytes"] >= 0,
2077                             rec["configured-diskbytes"])
2078         d.addCallback(_after_first_cycle)
2079         d.addCallback(lambda ign: self.render1(webstatus))
2080         def _check_html(html):
2081             s = remove_tags(html)
2082             self.failUnlessIn("Expiration Enabled:"
2083                               " expired leases will be removed", s)
2084             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2085             substr = "Leases created or last renewed before %s will be considered expired." % date
2086             self.failUnlessIn(substr, s)
2087             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2088         d.addCallback(_check_html)
2089         return d
2090
2091     def test_only_immutable(self):
2092         basedir = "storage/LeaseCrawler/only_immutable"
2093         fileutil.make_dirs(basedir)
2094         now = time.time()
2095         then = int(now - 2000)
2096         ss = StorageServer(basedir, "\x00" * 20,
2097                            expiration_enabled=True,
2098                            expiration_mode="cutoff-date",
2099                            expiration_cutoff_date=then,
2100                            expiration_sharetypes=("immutable",))
2101         lc = ss.lease_checker
2102         lc.slow_start = 0
2103         webstatus = StorageStatus(ss)
2104
2105         self.make_shares(ss)
2106         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2107         # set all leases to be expirable
2108         new_expiration_time = now - 3000 + 31*24*60*60
2109
2110         def count_shares(si):
2111             return len(list(ss._iter_share_files(si)))
2112         def _get_sharefile(si):
2113             return list(ss._iter_share_files(si))[0]
2114         def count_leases(si):
2115             return len(list(_get_sharefile(si).get_leases()))
2116
2117         sf0 = _get_sharefile(immutable_si_0)
2118         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2119         sf1 = _get_sharefile(immutable_si_1)
2120         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2121         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2122         sf2 = _get_sharefile(mutable_si_2)
2123         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2124         sf3 = _get_sharefile(mutable_si_3)
2125         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2126         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2127
2128         ss.setServiceParent(self.s)
2129         def _wait():
2130             return bool(lc.get_state()["last-cycle-finished"] is not None)
2131         d = self.poll(_wait)
2132
2133         def _after_first_cycle(ignored):
2134             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2135             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2136             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2137             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2138             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2139             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2140         d.addCallback(_after_first_cycle)
2141         d.addCallback(lambda ign: self.render1(webstatus))
2142         def _check_html(html):
2143             s = remove_tags(html)
2144             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2145         d.addCallback(_check_html)
2146         return d
2147
2148     def test_only_mutable(self):
2149         basedir = "storage/LeaseCrawler/only_mutable"
2150         fileutil.make_dirs(basedir)
2151         now = time.time()
2152         then = int(now - 2000)
2153         ss = StorageServer(basedir, "\x00" * 20,
2154                            expiration_enabled=True,
2155                            expiration_mode="cutoff-date",
2156                            expiration_cutoff_date=then,
2157                            expiration_sharetypes=("mutable",))
2158         lc = ss.lease_checker
2159         lc.slow_start = 0
2160         webstatus = StorageStatus(ss)
2161
2162         self.make_shares(ss)
2163         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2164         # set all leases to be expirable
2165         new_expiration_time = now - 3000 + 31*24*60*60
2166
2167         def count_shares(si):
2168             return len(list(ss._iter_share_files(si)))
2169         def _get_sharefile(si):
2170             return list(ss._iter_share_files(si))[0]
2171         def count_leases(si):
2172             return len(list(_get_sharefile(si).get_leases()))
2173
2174         sf0 = _get_sharefile(immutable_si_0)
2175         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2176         sf1 = _get_sharefile(immutable_si_1)
2177         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2178         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2179         sf2 = _get_sharefile(mutable_si_2)
2180         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2181         sf3 = _get_sharefile(mutable_si_3)
2182         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2183         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2184
2185         ss.setServiceParent(self.s)
2186         def _wait():
2187             return bool(lc.get_state()["last-cycle-finished"] is not None)
2188         d = self.poll(_wait)
2189
2190         def _after_first_cycle(ignored):
2191             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2192             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2193             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2194             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2195             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2196             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2197         d.addCallback(_after_first_cycle)
2198         d.addCallback(lambda ign: self.render1(webstatus))
2199         def _check_html(html):
2200             s = remove_tags(html)
2201             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2202         d.addCallback(_check_html)
2203         return d
2204
2205     def test_bad_mode(self):
2206         basedir = "storage/LeaseCrawler/bad_mode"
2207         fileutil.make_dirs(basedir)
2208         e = self.failUnlessRaises(ValueError,
2209                                   StorageServer, basedir, "\x00" * 20,
2210                                   expiration_mode="bogus")
2211         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2212
2213     def test_parse_duration(self):
2214         DAY = 24*60*60
2215         MONTH = 31*DAY
2216         YEAR = 365*DAY
2217         p = time_format.parse_duration
2218         self.failUnlessEqual(p("7days"), 7*DAY)
2219         self.failUnlessEqual(p("31day"), 31*DAY)
2220         self.failUnlessEqual(p("60 days"), 60*DAY)
2221         self.failUnlessEqual(p("2mo"), 2*MONTH)
2222         self.failUnlessEqual(p("3 month"), 3*MONTH)
2223         self.failUnlessEqual(p("2years"), 2*YEAR)
2224         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2225         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2226
2227     def test_parse_date(self):
2228         p = time_format.parse_date
2229         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2230         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2231
2232     def test_limited_history(self):
2233         basedir = "storage/LeaseCrawler/limited_history"
2234         fileutil.make_dirs(basedir)
2235         ss = StorageServer(basedir, "\x00" * 20)
2236         # make it start sooner than usual.
2237         lc = ss.lease_checker
2238         lc.slow_start = 0
2239         lc.cpu_slice = 500
2240
2241         # create a few shares, with some leases on them
2242         self.make_shares(ss)
2243
2244         ss.setServiceParent(self.s)
2245
2246         def _wait_until_15_cycles_done():
2247             last = lc.state["last-cycle-finished"]
2248             if last is not None and last >= 15:
2249                 return True
2250             if lc.timer:
2251                 lc.timer.reset(0)
2252             return False
2253         d = self.poll(_wait_until_15_cycles_done)
2254
2255         def _check(ignored):
2256             s = lc.get_state()
2257             h = s["history"]
2258             self.failUnlessEqual(len(h), 10)
2259             self.failUnlessEqual(max(h.keys()), 15)
2260             self.failUnlessEqual(min(h.keys()), 6)
2261         d.addCallback(_check)
2262         return d
2263
2264     def test_unpredictable_future(self):
2265         basedir = "storage/LeaseCrawler/unpredictable_future"
2266         fileutil.make_dirs(basedir)
2267         ss = StorageServer(basedir, "\x00" * 20)
2268         # make it start sooner than usual.
2269         lc = ss.lease_checker
2270         lc.slow_start = 0
2271         lc.cpu_slice = -1.0 # stop quickly
2272
2273         self.make_shares(ss)
2274
2275         ss.setServiceParent(self.s)
2276
2277         d = fireEventually()
2278         def _check(ignored):
2279             # this should fire after the first bucket is complete, but before
2280             # the first prefix is complete, so the progress-measurer won't
2281             # think we've gotten far enough to raise our percent-complete
2282             # above 0%, triggering the cannot-predict-the-future code in
2283             # expirer.py . This will have to change if/when the
2284             # progress-measurer gets smart enough to count buckets (we'll
2285             # have to interrupt it even earlier, before it's finished the
2286             # first bucket).
2287             s = lc.get_state()
2288             if "cycle-to-date" not in s:
2289                 d2 = fireEventually()
2290                 d2.addCallback(_check)
2291                 return d2
2292             self.failUnlessIn("cycle-to-date", s)
2293             self.failUnlessIn("estimated-remaining-cycle", s)
2294             self.failUnlessIn("estimated-current-cycle", s)
2295
2296             left = s["estimated-remaining-cycle"]["space-recovered"]
2297             self.failUnlessEqual(left["actual-buckets"], None)
2298             self.failUnlessEqual(left["original-buckets"], None)
2299             self.failUnlessEqual(left["configured-buckets"], None)
2300             self.failUnlessEqual(left["actual-shares"], None)
2301             self.failUnlessEqual(left["original-shares"], None)
2302             self.failUnlessEqual(left["configured-shares"], None)
2303             self.failUnlessEqual(left["actual-diskbytes"], None)
2304             self.failUnlessEqual(left["original-diskbytes"], None)
2305             self.failUnlessEqual(left["configured-diskbytes"], None)
2306             self.failUnlessEqual(left["actual-sharebytes"], None)
2307             self.failUnlessEqual(left["original-sharebytes"], None)
2308             self.failUnlessEqual(left["configured-sharebytes"], None)
2309
2310             full = s["estimated-remaining-cycle"]["space-recovered"]
2311             self.failUnlessEqual(full["actual-buckets"], None)
2312             self.failUnlessEqual(full["original-buckets"], None)
2313             self.failUnlessEqual(full["configured-buckets"], None)
2314             self.failUnlessEqual(full["actual-shares"], None)
2315             self.failUnlessEqual(full["original-shares"], None)
2316             self.failUnlessEqual(full["configured-shares"], None)
2317             self.failUnlessEqual(full["actual-diskbytes"], None)
2318             self.failUnlessEqual(full["original-diskbytes"], None)
2319             self.failUnlessEqual(full["configured-diskbytes"], None)
2320             self.failUnlessEqual(full["actual-sharebytes"], None)
2321             self.failUnlessEqual(full["original-sharebytes"], None)
2322             self.failUnlessEqual(full["configured-sharebytes"], None)
2323
2324         d.addCallback(_check)
2325         return d
2326
2327     def test_no_st_blocks(self):
2328         basedir = "storage/LeaseCrawler/no_st_blocks"
2329         fileutil.make_dirs(basedir)
2330         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2331                                         expiration_mode="age",
2332                                         expiration_override_lease_duration=-1000)
2333         # a negative expiration_time= means the "configured-"
2334         # space-recovered counts will be non-zero, since all shares will have
2335         # expired by then
2336
2337         # make it start sooner than usual.
2338         lc = ss.lease_checker
2339         lc.slow_start = 0
2340
2341         self.make_shares(ss)
2342         ss.setServiceParent(self.s)
2343         def _wait():
2344             return bool(lc.get_state()["last-cycle-finished"] is not None)
2345         d = self.poll(_wait)
2346
2347         def _check(ignored):
2348             s = lc.get_state()
2349             last = s["history"][0]
2350             rec = last["space-recovered"]
2351             self.failUnlessEqual(rec["configured-buckets"], 4)
2352             self.failUnlessEqual(rec["configured-shares"], 4)
2353             self.failUnless(rec["configured-sharebytes"] > 0,
2354                             rec["configured-sharebytes"])
2355             # without the .st_blocks field in os.stat() results, we should be
2356             # reporting diskbytes==sharebytes
2357             self.failUnlessEqual(rec["configured-sharebytes"],
2358                                  rec["configured-diskbytes"])
2359         d.addCallback(_check)
2360         return d
2361
2362     def test_share_corruption(self):
2363         self._poll_should_ignore_these_errors = [
2364             UnknownMutableContainerVersionError,
2365             UnknownImmutableContainerVersionError,
2366             ]
2367         basedir = "storage/LeaseCrawler/share_corruption"
2368         fileutil.make_dirs(basedir)
2369         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2370         w = StorageStatus(ss)
2371         # make it start sooner than usual.
2372         lc = ss.lease_checker
2373         lc.stop_after_first_bucket = True
2374         lc.slow_start = 0
2375         lc.cpu_slice = 500
2376
2377         # create a few shares, with some leases on them
2378         self.make_shares(ss)
2379
2380         # now corrupt one, and make sure the lease-checker keeps going
2381         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2382         first = min(self.sis)
2383         first_b32 = base32.b2a(first)
2384         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2385         f = open(fn, "rb+")
2386         f.seek(0)
2387         f.write("BAD MAGIC")
2388         f.close()
2389         # if get_share_file() doesn't see the correct mutable magic, it
2390         # assumes the file is an immutable share, and then
2391         # immutable.ShareFile sees a bad version. So regardless of which kind
2392         # of share we corrupted, this will trigger an
2393         # UnknownImmutableContainerVersionError.
2394
2395         # also create an empty bucket
2396         empty_si = base32.b2a("\x04"*16)
2397         empty_bucket_dir = os.path.join(ss.sharedir,
2398                                         storage_index_to_dir(empty_si))
2399         fileutil.make_dirs(empty_bucket_dir)
2400
2401         ss.setServiceParent(self.s)
2402
2403         d = fireEventually()
2404
2405         # now examine the state right after the first bucket has been
2406         # processed.
2407         def _after_first_bucket(ignored):
2408             s = lc.get_state()
2409             if "cycle-to-date" not in s:
2410                 d2 = fireEventually()
2411                 d2.addCallback(_after_first_bucket)
2412                 return d2
2413             so_far = s["cycle-to-date"]
2414             rec = so_far["space-recovered"]
2415             self.failUnlessEqual(rec["examined-buckets"], 1)
2416             self.failUnlessEqual(rec["examined-shares"], 0)
2417             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2418         d.addCallback(_after_first_bucket)
2419
2420         d.addCallback(lambda ign: self.render_json(w))
2421         def _check_json(json):
2422             data = simplejson.loads(json)
2423             # grr. json turns all dict keys into strings.
2424             so_far = data["lease-checker"]["cycle-to-date"]
2425             corrupt_shares = so_far["corrupt-shares"]
2426             # it also turns all tuples into lists
2427             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2428         d.addCallback(_check_json)
2429         d.addCallback(lambda ign: self.render1(w))
2430         def _check_html(html):
2431             s = remove_tags(html)
2432             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2433         d.addCallback(_check_html)
2434
2435         def _wait():
2436             return bool(lc.get_state()["last-cycle-finished"] is not None)
2437         d.addCallback(lambda ign: self.poll(_wait))
2438
2439         def _after_first_cycle(ignored):
2440             s = lc.get_state()
2441             last = s["history"][0]
2442             rec = last["space-recovered"]
2443             self.failUnlessEqual(rec["examined-buckets"], 5)
2444             self.failUnlessEqual(rec["examined-shares"], 3)
2445             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2446         d.addCallback(_after_first_cycle)
2447         d.addCallback(lambda ign: self.render_json(w))
2448         def _check_json_history(json):
2449             data = simplejson.loads(json)
2450             last = data["lease-checker"]["history"]["0"]
2451             corrupt_shares = last["corrupt-shares"]
2452             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2453         d.addCallback(_check_json_history)
2454         d.addCallback(lambda ign: self.render1(w))
2455         def _check_html_history(html):
2456             s = remove_tags(html)
2457             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2458         d.addCallback(_check_html_history)
2459
2460         def _cleanup(res):
2461             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2462                                    UnknownImmutableContainerVersionError)
2463             return res
2464         d.addBoth(_cleanup)
2465         return d
2466
2467     def render_json(self, page):
2468         d = self.render1(page, args={"t": ["json"]})
2469         return d
2470
2471 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2472
2473     def setUp(self):
2474         self.s = service.MultiService()
2475         self.s.startService()
2476     def tearDown(self):
2477         return self.s.stopService()
2478
2479     def test_no_server(self):
2480         w = StorageStatus(None)
2481         html = w.renderSynchronously()
2482         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2483
2484     def test_status(self):
2485         basedir = "storage/WebStatus/status"
2486         fileutil.make_dirs(basedir)
2487         ss = StorageServer(basedir, "\x00" * 20)
2488         ss.setServiceParent(self.s)
2489         w = StorageStatus(ss)
2490         d = self.render1(w)
2491         def _check_html(html):
2492             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2493             s = remove_tags(html)
2494             self.failUnlessIn("Accepting new shares: Yes", s)
2495             self.failUnlessIn("Reserved space: - 0 B (0)", s)
2496         d.addCallback(_check_html)
2497         d.addCallback(lambda ign: self.render_json(w))
2498         def _check_json(json):
2499             data = simplejson.loads(json)
2500             s = data["stats"]
2501             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2502             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2503             self.failUnlessIn("bucket-counter", data)
2504             self.failUnlessIn("lease-checker", data)
2505         d.addCallback(_check_json)
2506         return d
2507
2508     def render_json(self, page):
2509         d = self.render1(page, args={"t": ["json"]})
2510         return d
2511
2512     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2513     def test_status_no_disk_stats(self, mock_get_disk_stats):
2514         mock_get_disk_stats.side_effect = AttributeError()
2515
2516         # Some platforms may have no disk stats API. Make sure the code can handle that
2517         # (test runs on all platforms).
2518         basedir = "storage/WebStatus/status_no_disk_stats"
2519         fileutil.make_dirs(basedir)
2520         ss = StorageServer(basedir, "\x00" * 20)
2521         ss.setServiceParent(self.s)
2522         w = StorageStatus(ss)
2523         html = w.renderSynchronously()
2524         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2525         s = remove_tags(html)
2526         self.failUnlessIn("Accepting new shares: Yes", s)
2527         self.failUnlessIn("Total disk space: ?", s)
2528         self.failUnlessIn("Space Available to Tahoe: ?", s)
2529         self.failUnless(ss.get_available_space() is None)
2530
2531     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2532     def test_status_bad_disk_stats(self, mock_get_disk_stats):
2533         mock_get_disk_stats.side_effect = OSError()
2534
2535         # If the API to get disk stats exists but a call to it fails, then the status should
2536         # show that no shares will be accepted, and get_available_space() should be 0.
2537         basedir = "storage/WebStatus/status_bad_disk_stats"
2538         fileutil.make_dirs(basedir)
2539         ss = StorageServer(basedir, "\x00" * 20)
2540         ss.setServiceParent(self.s)
2541         w = StorageStatus(ss)
2542         html = w.renderSynchronously()
2543         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2544         s = remove_tags(html)
2545         self.failUnlessIn("Accepting new shares: No", s)
2546         self.failUnlessIn("Total disk space: ?", s)
2547         self.failUnlessIn("Space Available to Tahoe: ?", s)
2548         self.failUnlessEqual(ss.get_available_space(), 0)
2549
2550     def test_readonly(self):
2551         basedir = "storage/WebStatus/readonly"
2552         fileutil.make_dirs(basedir)
2553         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2554         ss.setServiceParent(self.s)
2555         w = StorageStatus(ss)
2556         html = w.renderSynchronously()
2557         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2558         s = remove_tags(html)
2559         self.failUnlessIn("Accepting new shares: No", s)
2560
2561     def test_reserved(self):
2562         basedir = "storage/WebStatus/reserved"
2563         fileutil.make_dirs(basedir)
2564         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2565         ss.setServiceParent(self.s)
2566         w = StorageStatus(ss)
2567         html = w.renderSynchronously()
2568         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2569         s = remove_tags(html)
2570         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2571
2572     def test_huge_reserved(self):
2573         basedir = "storage/WebStatus/reserved"
2574         fileutil.make_dirs(basedir)
2575         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2576         ss.setServiceParent(self.s)
2577         w = StorageStatus(ss)
2578         html = w.renderSynchronously()
2579         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2580         s = remove_tags(html)
2581         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2582
2583     def test_util(self):
2584         w = StorageStatus(None)
2585         self.failUnlessEqual(w.render_space(None, None), "?")
2586         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2587         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2588         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2589         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2590         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)