]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: use fileutil's version of get_disk_stats() and get_available_space(), use...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct
2
3 from allmydata.util import log
4
5 import mock
6
7 from twisted.trial import unittest
8
9 from twisted.internet import defer
10 from twisted.application import service
11 from foolscap.api import fireEventually
12 import itertools
13 from allmydata import interfaces
14 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
15 from allmydata.storage.server import StorageServer
16 from allmydata.storage.mutable import MutableShareFile
17 from allmydata.storage.immutable import BucketWriter, BucketReader
18 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
19      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
20 from allmydata.storage.lease import LeaseInfo
21 from allmydata.storage.crawler import BucketCountingCrawler
22 from allmydata.storage.expirer import LeaseCheckingCrawler
23 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
24      ReadBucketProxy
25 from allmydata.interfaces import BadWriteEnablerError
26 from allmydata.test.common import LoggingServiceParent
27 from allmydata.test.common_web import WebRenderingMixin
28 from allmydata.web.storage import StorageStatus, remove_prefix
29
30 class Marker:
31     pass
32 class FakeCanary:
33     def __init__(self, ignore_disconnectors=False):
34         self.ignore = ignore_disconnectors
35         self.disconnectors = {}
36     def notifyOnDisconnect(self, f, *args, **kwargs):
37         if self.ignore:
38             return
39         m = Marker()
40         self.disconnectors[m] = (f, args, kwargs)
41         return m
42     def dontNotifyOnDisconnect(self, marker):
43         if self.ignore:
44             return
45         del self.disconnectors[marker]
46
47 class FakeStatsProvider:
48     def count(self, name, delta=1):
49         pass
50     def register_producer(self, producer):
51         pass
52
53 class Bucket(unittest.TestCase):
54     def make_workdir(self, name):
55         basedir = os.path.join("storage", "Bucket", name)
56         incoming = os.path.join(basedir, "tmp", "bucket")
57         final = os.path.join(basedir, "bucket")
58         fileutil.make_dirs(basedir)
59         fileutil.make_dirs(os.path.join(basedir, "tmp"))
60         return incoming, final
61
62     def bucket_writer_closed(self, bw, consumed):
63         pass
64     def add_latency(self, category, latency):
65         pass
66     def count(self, name, delta=1):
67         pass
68
69     def make_lease(self):
70         owner_num = 0
71         renew_secret = os.urandom(32)
72         cancel_secret = os.urandom(32)
73         expiration_time = time.time() + 5000
74         return LeaseInfo(owner_num, renew_secret, cancel_secret,
75                          expiration_time, "\x00" * 20)
76
77     def test_create(self):
78         incoming, final = self.make_workdir("test_create")
79         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
80                           FakeCanary())
81         bw.remote_write(0, "a"*25)
82         bw.remote_write(25, "b"*25)
83         bw.remote_write(50, "c"*25)
84         bw.remote_write(75, "d"*7)
85         bw.remote_close()
86
87     def test_readwrite(self):
88         incoming, final = self.make_workdir("test_readwrite")
89         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
90                           FakeCanary())
91         bw.remote_write(0, "a"*25)
92         bw.remote_write(25, "b"*25)
93         bw.remote_write(50, "c"*7) # last block may be short
94         bw.remote_close()
95
96         # now read from it
97         br = BucketReader(self, bw.finalhome)
98         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
99         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
100         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101
102 class RemoteBucket:
103
104     def callRemote(self, methname, *args, **kwargs):
105         def _call():
106             meth = getattr(self.target, "remote_" + methname)
107             return meth(*args, **kwargs)
108         return defer.maybeDeferred(_call)
109
110 class BucketProxy(unittest.TestCase):
111     def make_bucket(self, name, size):
112         basedir = os.path.join("storage", "BucketProxy", name)
113         incoming = os.path.join(basedir, "tmp", "bucket")
114         final = os.path.join(basedir, "bucket")
115         fileutil.make_dirs(basedir)
116         fileutil.make_dirs(os.path.join(basedir, "tmp"))
117         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
118                           FakeCanary())
119         rb = RemoteBucket()
120         rb.target = bw
121         return bw, rb, final
122
123     def make_lease(self):
124         owner_num = 0
125         renew_secret = os.urandom(32)
126         cancel_secret = os.urandom(32)
127         expiration_time = time.time() + 5000
128         return LeaseInfo(owner_num, renew_secret, cancel_secret,
129                          expiration_time, "\x00" * 20)
130
131     def bucket_writer_closed(self, bw, consumed):
132         pass
133     def add_latency(self, category, latency):
134         pass
135     def count(self, name, delta=1):
136         pass
137
138     def test_create(self):
139         bw, rb, sharefname = self.make_bucket("test_create", 500)
140         bp = WriteBucketProxy(rb,
141                               data_size=300,
142                               block_size=10,
143                               num_segments=5,
144                               num_share_hashes=3,
145                               uri_extension_size_max=500, nodeid=None)
146         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
147
148     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
149         # Let's pretend each share has 100 bytes of data, and that there are
150         # 4 segments (25 bytes each), and 8 shares total. So the two
151         # per-segment merkle trees (crypttext_hash_tree,
152         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
153         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
154         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
155         # long. That should make the whole share:
156         #
157         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
158         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
159
160         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
161
162         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
163                             for i in range(7)]
164         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
165                         for i in range(7)]
166         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
167                         for i in (1,9,13)]
168         uri_extension = "s" + "E"*498 + "e"
169
170         bw, rb, sharefname = self.make_bucket(name, sharesize)
171         bp = wbp_class(rb,
172                        data_size=95,
173                        block_size=25,
174                        num_segments=4,
175                        num_share_hashes=3,
176                        uri_extension_size_max=len(uri_extension),
177                        nodeid=None)
178
179         d = bp.put_header()
180         d.addCallback(lambda res: bp.put_block(0, "a"*25))
181         d.addCallback(lambda res: bp.put_block(1, "b"*25))
182         d.addCallback(lambda res: bp.put_block(2, "c"*25))
183         d.addCallback(lambda res: bp.put_block(3, "d"*20))
184         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
185         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
186         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
187         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
188         d.addCallback(lambda res: bp.close())
189
190         # now read everything back
191         def _start_reading(res):
192             br = BucketReader(self, sharefname)
193             rb = RemoteBucket()
194             rb.target = br
195             rbp = rbp_class(rb, peerid="abc", storage_index="")
196             self.failUnlessIn("to peer", repr(rbp))
197             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
198
199             d1 = rbp.get_block_data(0, 25, 25)
200             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
201             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
202             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
203             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
204             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
205             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
206             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
207
208             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
209             d1.addCallback(lambda res:
210                            self.failUnlessEqual(res, crypttext_hashes))
211             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
212             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
213             d1.addCallback(lambda res: rbp.get_share_hashes())
214             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
215             d1.addCallback(lambda res: rbp.get_uri_extension())
216             d1.addCallback(lambda res:
217                            self.failUnlessEqual(res, uri_extension))
218
219             return d1
220
221         d.addCallback(_start_reading)
222
223         return d
224
225     def test_readwrite_v1(self):
226         return self._do_test_readwrite("test_readwrite_v1",
227                                        0x24, WriteBucketProxy, ReadBucketProxy)
228
229     def test_readwrite_v2(self):
230         return self._do_test_readwrite("test_readwrite_v2",
231                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
232
233 class Server(unittest.TestCase):
234
235     def setUp(self):
236         self.sparent = LoggingServiceParent()
237         self.sparent.startService()
238         self._lease_secret = itertools.count()
239     def tearDown(self):
240         return self.sparent.stopService()
241
242     def workdir(self, name):
243         basedir = os.path.join("storage", "Server", name)
244         return basedir
245
246     def create(self, name, reserved_space=0, klass=StorageServer):
247         workdir = self.workdir(name)
248         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
249                    stats_provider=FakeStatsProvider())
250         ss.setServiceParent(self.sparent)
251         return ss
252
253     def test_create(self):
254         self.create("test_create")
255
256     def allocate(self, ss, storage_index, sharenums, size, canary=None):
257         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
258         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259         if not canary:
260             canary = FakeCanary()
261         return ss.remote_allocate_buckets(storage_index,
262                                           renew_secret, cancel_secret,
263                                           sharenums, size, canary)
264
265     def test_large_share(self):
266         syslow = platform.system().lower()
267         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
268             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
269
270         avail = fileutil.get_available_space('.', 2**14)
271         if avail <= 4*2**30:
272             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
273
274         ss = self.create("test_large_share")
275
276         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
277         self.failUnlessEqual(already, set())
278         self.failUnlessEqual(set(writers.keys()), set([0]))
279
280         shnum, bucket = writers.items()[0]
281         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
282         bucket.remote_write(2**32, "ab")
283         bucket.remote_close()
284
285         readers = ss.remote_get_buckets("allocate")
286         reader = readers[shnum]
287         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
288
289     def test_dont_overfill_dirs(self):
290         """
291         This test asserts that if you add a second share whose storage index
292         share lots of leading bits with an extant share (but isn't the exact
293         same storage index), this won't add an entry to the share directory.
294         """
295         ss = self.create("test_dont_overfill_dirs")
296         already, writers = self.allocate(ss, "storageindex", [0], 10)
297         for i, wb in writers.items():
298             wb.remote_write(0, "%10d" % i)
299             wb.remote_close()
300         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
301                                 "shares")
302         children_of_storedir = set(os.listdir(storedir))
303
304         # Now store another one under another storageindex that has leading
305         # chars the same as the first storageindex.
306         already, writers = self.allocate(ss, "storageindey", [0], 10)
307         for i, wb in writers.items():
308             wb.remote_write(0, "%10d" % i)
309             wb.remote_close()
310         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
311                                 "shares")
312         new_children_of_storedir = set(os.listdir(storedir))
313         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
314
315     def test_remove_incoming(self):
316         ss = self.create("test_remove_incoming")
317         already, writers = self.allocate(ss, "vid", range(3), 10)
318         for i,wb in writers.items():
319             wb.remote_write(0, "%10d" % i)
320             wb.remote_close()
321         incoming_share_dir = wb.incominghome
322         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
323         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
324         incoming_dir = os.path.dirname(incoming_prefix_dir)
325         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
326         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
327         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
328
329     def test_abort(self):
330         # remote_abort, when called on a writer, should make sure that
331         # the allocated size of the bucket is not counted by the storage
332         # server when accounting for space.
333         ss = self.create("test_abort")
334         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
335         self.failIfEqual(ss.allocated_size(), 0)
336
337         # Now abort the writers.
338         for writer in writers.itervalues():
339             writer.remote_abort()
340         self.failUnlessEqual(ss.allocated_size(), 0)
341
342
343     def test_allocate(self):
344         ss = self.create("test_allocate")
345
346         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
347
348         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
349         self.failUnlessEqual(already, set())
350         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
351
352         # while the buckets are open, they should not count as readable
353         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
354
355         # close the buckets
356         for i,wb in writers.items():
357             wb.remote_write(0, "%25d" % i)
358             wb.remote_close()
359             # aborting a bucket that was already closed is a no-op
360             wb.remote_abort()
361
362         # now they should be readable
363         b = ss.remote_get_buckets("allocate")
364         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
365         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
366         b_str = str(b[0])
367         self.failUnlessIn("BucketReader", b_str)
368         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
369
370         # now if we ask about writing again, the server should offer those
371         # three buckets as already present. It should offer them even if we
372         # don't ask about those specific ones.
373         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
374         self.failUnlessEqual(already, set([0,1,2]))
375         self.failUnlessEqual(set(writers.keys()), set([3,4]))
376
377         # while those two buckets are open for writing, the server should
378         # refuse to offer them to uploaders
379
380         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
381         self.failUnlessEqual(already2, set([0,1,2]))
382         self.failUnlessEqual(set(writers2.keys()), set([5]))
383
384         # aborting the writes should remove the tempfiles
385         for i,wb in writers2.items():
386             wb.remote_abort()
387         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
388         self.failUnlessEqual(already2, set([0,1,2]))
389         self.failUnlessEqual(set(writers2.keys()), set([5]))
390
391         for i,wb in writers2.items():
392             wb.remote_abort()
393         for i,wb in writers.items():
394             wb.remote_abort()
395
396     def test_bad_container_version(self):
397         ss = self.create("test_bad_container_version")
398         a,w = self.allocate(ss, "si1", [0], 10)
399         w[0].remote_write(0, "\xff"*10)
400         w[0].remote_close()
401
402         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
403         f = open(fn, "rb+")
404         f.seek(0)
405         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
406         f.close()
407
408         ss.remote_get_buckets("allocate")
409
410         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
411                                   ss.remote_get_buckets, "si1")
412         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
413
414     def test_disconnect(self):
415         # simulate a disconnection
416         ss = self.create("test_disconnect")
417         canary = FakeCanary()
418         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
419         self.failUnlessEqual(already, set())
420         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
421         for (f,args,kwargs) in canary.disconnectors.values():
422             f(*args, **kwargs)
423         del already
424         del writers
425
426         # that ought to delete the incoming shares
427         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
428         self.failUnlessEqual(already, set())
429         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
430
431     @mock.patch('allmydata.util.fileutil.get_disk_stats')
432     def test_reserved_space(self, mock_get_disk_stats):
433         reserved_space=10000
434         mock_get_disk_stats.return_value = {
435             'free_for_nonroot': 15000,
436             'avail': max(15000 - reserved_space, 0),
437             }
438
439         ss = self.create("test_reserved_space", reserved_space=reserved_space)
440         # 15k available, 10k reserved, leaves 5k for shares
441
442         # a newly created and filled share incurs this much overhead, beyond
443         # the size we request.
444         OVERHEAD = 3*4
445         LEASE_SIZE = 4+32+32+4
446         canary = FakeCanary(True)
447         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
448         self.failUnlessEqual(len(writers), 3)
449         # now the StorageServer should have 3000 bytes provisionally
450         # allocated, allowing only 2000 more to be claimed
451         self.failUnlessEqual(len(ss._active_writers), 3)
452
453         # allocating 1001-byte shares only leaves room for one
454         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
455         self.failUnlessEqual(len(writers2), 1)
456         self.failUnlessEqual(len(ss._active_writers), 4)
457
458         # we abandon the first set, so their provisional allocation should be
459         # returned
460         del already
461         del writers
462         self.failUnlessEqual(len(ss._active_writers), 1)
463         # now we have a provisional allocation of 1001 bytes
464
465         # and we close the second set, so their provisional allocation should
466         # become real, long-term allocation, and grows to include the
467         # overhead.
468         for bw in writers2.values():
469             bw.remote_write(0, "a"*25)
470             bw.remote_close()
471         del already2
472         del writers2
473         del bw
474         self.failUnlessEqual(len(ss._active_writers), 0)
475
476         allocated = 1001 + OVERHEAD + LEASE_SIZE
477
478         # we have to manually increase available, since we're not doing real
479         # disk measurements
480         mock_get_disk_stats.return_value = {
481             'free_for_nonroot': 15000 - allocated,
482             'avail': max(15000 - allocated - reserved_space, 0),
483             }
484
485         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
486         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
487         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
488         self.failUnlessEqual(len(writers3), 39)
489         self.failUnlessEqual(len(ss._active_writers), 39)
490
491         del already3
492         del writers3
493         self.failUnlessEqual(len(ss._active_writers), 0)
494         ss.disownServiceParent()
495         del ss
496
497     def test_seek(self):
498         basedir = self.workdir("test_seek_behavior")
499         fileutil.make_dirs(basedir)
500         filename = os.path.join(basedir, "testfile")
501         f = open(filename, "wb")
502         f.write("start")
503         f.close()
504         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
505         # files. mode="a" preserves previous contents but does not allow
506         # seeking-to-create-holes. mode="r+" allows both.
507         f = open(filename, "rb+")
508         f.seek(100)
509         f.write("100")
510         f.close()
511         filelen = os.stat(filename)[stat.ST_SIZE]
512         self.failUnlessEqual(filelen, 100+3)
513         f2 = open(filename, "rb")
514         self.failUnlessEqual(f2.read(5), "start")
515
516
517     def test_leases(self):
518         ss = self.create("test_leases")
519         canary = FakeCanary()
520         sharenums = range(5)
521         size = 100
522
523         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
524                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
525         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
526                                                      sharenums, size, canary)
527         self.failUnlessEqual(len(already), 0)
528         self.failUnlessEqual(len(writers), 5)
529         for wb in writers.values():
530             wb.remote_close()
531
532         leases = list(ss.get_leases("si0"))
533         self.failUnlessEqual(len(leases), 1)
534         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
535
536         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
537                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
538         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
539                                                      sharenums, size, canary)
540         for wb in writers.values():
541             wb.remote_close()
542
543         # take out a second lease on si1
544         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
545                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
546         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
547                                                      sharenums, size, canary)
548         self.failUnlessEqual(len(already), 5)
549         self.failUnlessEqual(len(writers), 0)
550
551         leases = list(ss.get_leases("si1"))
552         self.failUnlessEqual(len(leases), 2)
553         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
554
555         # and a third lease, using add-lease
556         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
557                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
558         ss.remote_add_lease("si1", rs2a, cs2a)
559         leases = list(ss.get_leases("si1"))
560         self.failUnlessEqual(len(leases), 3)
561         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
562
563         # add-lease on a missing storage index is silently ignored
564         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
565
566         # check that si0 is readable
567         readers = ss.remote_get_buckets("si0")
568         self.failUnlessEqual(len(readers), 5)
569
570         # renew the first lease. Only the proper renew_secret should work
571         ss.remote_renew_lease("si0", rs0)
572         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
573         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
574
575         # check that si0 is still readable
576         readers = ss.remote_get_buckets("si0")
577         self.failUnlessEqual(len(readers), 5)
578
579         # now cancel it
580         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
581         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
582         ss.remote_cancel_lease("si0", cs0)
583
584         # si0 should now be gone
585         readers = ss.remote_get_buckets("si0")
586         self.failUnlessEqual(len(readers), 0)
587         # and the renew should no longer work
588         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
589
590
591         # cancel the first lease on si1, leaving the second and third in place
592         ss.remote_cancel_lease("si1", cs1)
593         readers = ss.remote_get_buckets("si1")
594         self.failUnlessEqual(len(readers), 5)
595         # the corresponding renew should no longer work
596         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
597
598         leases = list(ss.get_leases("si1"))
599         self.failUnlessEqual(len(leases), 2)
600         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
601
602         ss.remote_renew_lease("si1", rs2)
603         # cancelling the second and third should make it go away
604         ss.remote_cancel_lease("si1", cs2)
605         ss.remote_cancel_lease("si1", cs2a)
606         readers = ss.remote_get_buckets("si1")
607         self.failUnlessEqual(len(readers), 0)
608         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
609         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
610         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
611
612         leases = list(ss.get_leases("si1"))
613         self.failUnlessEqual(len(leases), 0)
614
615
616         # test overlapping uploads
617         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
618                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
619         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
620                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
621         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
622                                                      sharenums, size, canary)
623         self.failUnlessEqual(len(already), 0)
624         self.failUnlessEqual(len(writers), 5)
625         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
626                                                        sharenums, size, canary)
627         self.failUnlessEqual(len(already2), 0)
628         self.failUnlessEqual(len(writers2), 0)
629         for wb in writers.values():
630             wb.remote_close()
631
632         leases = list(ss.get_leases("si3"))
633         self.failUnlessEqual(len(leases), 1)
634
635         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
636                                                        sharenums, size, canary)
637         self.failUnlessEqual(len(already3), 5)
638         self.failUnlessEqual(len(writers3), 0)
639
640         leases = list(ss.get_leases("si3"))
641         self.failUnlessEqual(len(leases), 2)
642
643     def test_readonly(self):
644         workdir = self.workdir("test_readonly")
645         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
646         ss.setServiceParent(self.sparent)
647
648         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
649         self.failUnlessEqual(already, set())
650         self.failUnlessEqual(writers, {})
651
652         stats = ss.get_stats()
653         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
654         if "storage_server.disk_avail" in stats:
655             # Some platforms may not have an API to get disk stats.
656             # But if there are stats, readonly_storage means disk_avail=0
657             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
658
659     def test_discard(self):
660         # discard is really only used for other tests, but we test it anyways
661         workdir = self.workdir("test_discard")
662         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
663         ss.setServiceParent(self.sparent)
664
665         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
666         self.failUnlessEqual(already, set())
667         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
668         for i,wb in writers.items():
669             wb.remote_write(0, "%25d" % i)
670             wb.remote_close()
671         # since we discard the data, the shares should be present but sparse.
672         # Since we write with some seeks, the data we read back will be all
673         # zeros.
674         b = ss.remote_get_buckets("vid")
675         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
676         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
677
678     def test_advise_corruption(self):
679         workdir = self.workdir("test_advise_corruption")
680         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
681         ss.setServiceParent(self.sparent)
682
683         si0_s = base32.b2a("si0")
684         ss.remote_advise_corrupt_share("immutable", "si0", 0,
685                                        "This share smells funny.\n")
686         reportdir = os.path.join(workdir, "corruption-advisories")
687         reports = os.listdir(reportdir)
688         self.failUnlessEqual(len(reports), 1)
689         report_si0 = reports[0]
690         self.failUnlessIn(si0_s, report_si0)
691         f = open(os.path.join(reportdir, report_si0), "r")
692         report = f.read()
693         f.close()
694         self.failUnlessIn("type: immutable", report)
695         self.failUnlessIn("storage_index: %s" % si0_s, report)
696         self.failUnlessIn("share_number: 0", report)
697         self.failUnlessIn("This share smells funny.", report)
698
699         # test the RIBucketWriter version too
700         si1_s = base32.b2a("si1")
701         already,writers = self.allocate(ss, "si1", [1], 75)
702         self.failUnlessEqual(already, set())
703         self.failUnlessEqual(set(writers.keys()), set([1]))
704         writers[1].remote_write(0, "data")
705         writers[1].remote_close()
706
707         b = ss.remote_get_buckets("si1")
708         self.failUnlessEqual(set(b.keys()), set([1]))
709         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
710
711         reports = os.listdir(reportdir)
712         self.failUnlessEqual(len(reports), 2)
713         report_si1 = [r for r in reports if si1_s in r][0]
714         f = open(os.path.join(reportdir, report_si1), "r")
715         report = f.read()
716         f.close()
717         self.failUnlessIn("type: immutable", report)
718         self.failUnlessIn("storage_index: %s" % si1_s, report)
719         self.failUnlessIn("share_number: 1", report)
720         self.failUnlessIn("This share tastes like dust.", report)
721
722
723
724 class MutableServer(unittest.TestCase):
725
726     def setUp(self):
727         self.sparent = LoggingServiceParent()
728         self._lease_secret = itertools.count()
729     def tearDown(self):
730         return self.sparent.stopService()
731
732     def workdir(self, name):
733         basedir = os.path.join("storage", "MutableServer", name)
734         return basedir
735
736     def create(self, name):
737         workdir = self.workdir(name)
738         ss = StorageServer(workdir, "\x00" * 20)
739         ss.setServiceParent(self.sparent)
740         return ss
741
742     def test_create(self):
743         self.create("test_create")
744
745     def write_enabler(self, we_tag):
746         return hashutil.tagged_hash("we_blah", we_tag)
747
748     def renew_secret(self, tag):
749         return hashutil.tagged_hash("renew_blah", str(tag))
750
751     def cancel_secret(self, tag):
752         return hashutil.tagged_hash("cancel_blah", str(tag))
753
754     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
755         write_enabler = self.write_enabler(we_tag)
756         renew_secret = self.renew_secret(lease_tag)
757         cancel_secret = self.cancel_secret(lease_tag)
758         rstaraw = ss.remote_slot_testv_and_readv_and_writev
759         testandwritev = dict( [ (shnum, ([], [], None) )
760                          for shnum in sharenums ] )
761         readv = []
762         rc = rstaraw(storage_index,
763                      (write_enabler, renew_secret, cancel_secret),
764                      testandwritev,
765                      readv)
766         (did_write, readv_data) = rc
767         self.failUnless(did_write)
768         self.failUnless(isinstance(readv_data, dict))
769         self.failUnlessEqual(len(readv_data), 0)
770
771     def test_bad_magic(self):
772         ss = self.create("test_bad_magic")
773         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
774         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
775         f = open(fn, "rb+")
776         f.seek(0)
777         f.write("BAD MAGIC")
778         f.close()
779         read = ss.remote_slot_readv
780         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
781                                   read, "si1", [0], [(0,10)])
782         self.failUnlessIn(" had magic ", str(e))
783         self.failUnlessIn(" but we wanted ", str(e))
784
785     def test_container_size(self):
786         ss = self.create("test_container_size")
787         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
788                       set([0,1,2]), 100)
789         read = ss.remote_slot_readv
790         rstaraw = ss.remote_slot_testv_and_readv_and_writev
791         secrets = ( self.write_enabler("we1"),
792                     self.renew_secret("we1"),
793                     self.cancel_secret("we1") )
794         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
795         answer = rstaraw("si1", secrets,
796                          {0: ([], [(0,data)], len(data)+12)},
797                          [])
798         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
799
800         # trying to make the container too large will raise an exception
801         TOOBIG = MutableShareFile.MAX_SIZE + 10
802         self.failUnlessRaises(DataTooLargeError,
803                               rstaraw, "si1", secrets,
804                               {0: ([], [(0,data)], TOOBIG)},
805                               [])
806
807         # it should be possible to make the container smaller, although at
808         # the moment this doesn't actually affect the share, unless the
809         # container size is dropped to zero, in which case the share is
810         # deleted.
811         answer = rstaraw("si1", secrets,
812                          {0: ([], [(0,data)], len(data)+8)},
813                          [])
814         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
815
816         answer = rstaraw("si1", secrets,
817                          {0: ([], [(0,data)], 0)},
818                          [])
819         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
820
821         read_answer = read("si1", [0], [(0,10)])
822         self.failUnlessEqual(read_answer, {})
823
824     def test_allocate(self):
825         ss = self.create("test_allocate")
826         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
827                       set([0,1,2]), 100)
828
829         read = ss.remote_slot_readv
830         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
831                              {0: [""]})
832         self.failUnlessEqual(read("si1", [], [(0, 10)]),
833                              {0: [""], 1: [""], 2: [""]})
834         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
835                              {0: [""]})
836
837         # try writing to one
838         secrets = ( self.write_enabler("we1"),
839                     self.renew_secret("we1"),
840                     self.cancel_secret("we1") )
841         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
842         write = ss.remote_slot_testv_and_readv_and_writev
843         answer = write("si1", secrets,
844                        {0: ([], [(0,data)], None)},
845                        [])
846         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
847
848         self.failUnlessEqual(read("si1", [0], [(0,20)]),
849                              {0: ["00000000001111111111"]})
850         self.failUnlessEqual(read("si1", [0], [(95,10)]),
851                              {0: ["99999"]})
852         #self.failUnlessEqual(s0.remote_get_length(), 100)
853
854         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
855         f = self.failUnlessRaises(BadWriteEnablerError,
856                                   write, "si1", bad_secrets,
857                                   {}, [])
858         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
859
860         # this testv should fail
861         answer = write("si1", secrets,
862                        {0: ([(0, 12, "eq", "444444444444"),
863                              (20, 5, "eq", "22222"),
864                              ],
865                             [(0, "x"*100)],
866                             None),
867                         },
868                        [(0,12), (20,5)],
869                        )
870         self.failUnlessEqual(answer, (False,
871                                       {0: ["000000000011", "22222"],
872                                        1: ["", ""],
873                                        2: ["", ""],
874                                        }))
875         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
876
877         # as should this one
878         answer = write("si1", secrets,
879                        {0: ([(10, 5, "lt", "11111"),
880                              ],
881                             [(0, "x"*100)],
882                             None),
883                         },
884                        [(10,5)],
885                        )
886         self.failUnlessEqual(answer, (False,
887                                       {0: ["11111"],
888                                        1: [""],
889                                        2: [""]},
890                                       ))
891         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
892
893
894     def test_operators(self):
895         # test operators, the data we're comparing is '11111' in all cases.
896         # test both fail+pass, reset data after each one.
897         ss = self.create("test_operators")
898
899         secrets = ( self.write_enabler("we1"),
900                     self.renew_secret("we1"),
901                     self.cancel_secret("we1") )
902         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
903         write = ss.remote_slot_testv_and_readv_and_writev
904         read = ss.remote_slot_readv
905
906         def reset():
907             write("si1", secrets,
908                   {0: ([], [(0,data)], None)},
909                   [])
910
911         reset()
912
913         #  lt
914         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
915                                              ],
916                                             [(0, "x"*100)],
917                                             None,
918                                             )}, [(10,5)])
919         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
920         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
921         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
922         reset()
923
924         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
925                                              ],
926                                             [(0, "x"*100)],
927                                             None,
928                                             )}, [(10,5)])
929         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
930         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
931         reset()
932
933         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
934                                              ],
935                                             [(0, "y"*100)],
936                                             None,
937                                             )}, [(10,5)])
938         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
939         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
940         reset()
941
942         #  le
943         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
944                                              ],
945                                             [(0, "x"*100)],
946                                             None,
947                                             )}, [(10,5)])
948         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
949         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
950         reset()
951
952         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
953                                              ],
954                                             [(0, "y"*100)],
955                                             None,
956                                             )}, [(10,5)])
957         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
958         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
959         reset()
960
961         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
962                                              ],
963                                             [(0, "y"*100)],
964                                             None,
965                                             )}, [(10,5)])
966         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
967         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
968         reset()
969
970         #  eq
971         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
972                                              ],
973                                             [(0, "x"*100)],
974                                             None,
975                                             )}, [(10,5)])
976         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
977         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
978         reset()
979
980         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
981                                              ],
982                                             [(0, "y"*100)],
983                                             None,
984                                             )}, [(10,5)])
985         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
986         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
987         reset()
988
989         #  ne
990         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
991                                              ],
992                                             [(0, "x"*100)],
993                                             None,
994                                             )}, [(10,5)])
995         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
996         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
997         reset()
998
999         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1000                                              ],
1001                                             [(0, "y"*100)],
1002                                             None,
1003                                             )}, [(10,5)])
1004         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1005         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1006         reset()
1007
1008         #  ge
1009         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1010                                              ],
1011                                             [(0, "y"*100)],
1012                                             None,
1013                                             )}, [(10,5)])
1014         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1015         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1016         reset()
1017
1018         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1019                                              ],
1020                                             [(0, "y"*100)],
1021                                             None,
1022                                             )}, [(10,5)])
1023         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1024         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1025         reset()
1026
1027         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1028                                              ],
1029                                             [(0, "y"*100)],
1030                                             None,
1031                                             )}, [(10,5)])
1032         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1033         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1034         reset()
1035
1036         #  gt
1037         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1038                                              ],
1039                                             [(0, "y"*100)],
1040                                             None,
1041                                             )}, [(10,5)])
1042         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1043         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1044         reset()
1045
1046         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1047                                              ],
1048                                             [(0, "x"*100)],
1049                                             None,
1050                                             )}, [(10,5)])
1051         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1052         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1053         reset()
1054
1055         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1056                                              ],
1057                                             [(0, "x"*100)],
1058                                             None,
1059                                             )}, [(10,5)])
1060         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1061         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1062         reset()
1063
1064         # finally, test some operators against empty shares
1065         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1066                                              ],
1067                                             [(0, "x"*100)],
1068                                             None,
1069                                             )}, [(10,5)])
1070         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1071         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1072         reset()
1073
1074     def test_readv(self):
1075         ss = self.create("test_readv")
1076         secrets = ( self.write_enabler("we1"),
1077                     self.renew_secret("we1"),
1078                     self.cancel_secret("we1") )
1079         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1080         write = ss.remote_slot_testv_and_readv_and_writev
1081         read = ss.remote_slot_readv
1082         data = [("%d" % i) * 100 for i in range(3)]
1083         rc = write("si1", secrets,
1084                    {0: ([], [(0,data[0])], None),
1085                     1: ([], [(0,data[1])], None),
1086                     2: ([], [(0,data[2])], None),
1087                     }, [])
1088         self.failUnlessEqual(rc, (True, {}))
1089
1090         answer = read("si1", [], [(0, 10)])
1091         self.failUnlessEqual(answer, {0: ["0"*10],
1092                                       1: ["1"*10],
1093                                       2: ["2"*10]})
1094
1095     def compare_leases_without_timestamps(self, leases_a, leases_b):
1096         self.failUnlessEqual(len(leases_a), len(leases_b))
1097         for i in range(len(leases_a)):
1098             a = leases_a[i]
1099             b = leases_b[i]
1100             self.failUnlessEqual(a.owner_num,       b.owner_num)
1101             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1102             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1103             self.failUnlessEqual(a.nodeid,          b.nodeid)
1104
1105     def compare_leases(self, leases_a, leases_b):
1106         self.failUnlessEqual(len(leases_a), len(leases_b))
1107         for i in range(len(leases_a)):
1108             a = leases_a[i]
1109             b = leases_b[i]
1110             self.failUnlessEqual(a.owner_num,       b.owner_num)
1111             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1112             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1113             self.failUnlessEqual(a.nodeid,          b.nodeid)
1114             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1115
1116     def test_leases(self):
1117         ss = self.create("test_leases")
1118         def secrets(n):
1119             return ( self.write_enabler("we1"),
1120                      self.renew_secret("we1-%d" % n),
1121                      self.cancel_secret("we1-%d" % n) )
1122         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1123         write = ss.remote_slot_testv_and_readv_and_writev
1124         read = ss.remote_slot_readv
1125         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1126         self.failUnlessEqual(rc, (True, {}))
1127
1128         # create a random non-numeric file in the bucket directory, to
1129         # exercise the code that's supposed to ignore those.
1130         bucket_dir = os.path.join(self.workdir("test_leases"),
1131                                   "shares", storage_index_to_dir("si1"))
1132         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1133         f.write("you ought to be ignoring me\n")
1134         f.close()
1135
1136         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1137         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1138
1139         # add-lease on a missing storage index is silently ignored
1140         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1141
1142         # re-allocate the slots and use the same secrets, that should update
1143         # the lease
1144         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1145         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1146
1147         # renew it directly
1148         ss.remote_renew_lease("si1", secrets(0)[1])
1149         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1150
1151         # now allocate them with a bunch of different secrets, to trigger the
1152         # extended lease code. Use add_lease for one of them.
1153         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1154         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1155         secrets2 = secrets(2)
1156         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1157         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1158         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1159         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1160         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1161
1162         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1163
1164         # cancel one of them
1165         ss.remote_cancel_lease("si1", secrets(5)[2])
1166         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1167
1168         all_leases = list(s0.get_leases())
1169         # and write enough data to expand the container, forcing the server
1170         # to move the leases
1171         write("si1", secrets(0),
1172               {0: ([], [(0,data)], 200), },
1173               [])
1174
1175         # read back the leases, make sure they're still intact.
1176         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1177
1178         ss.remote_renew_lease("si1", secrets(0)[1])
1179         ss.remote_renew_lease("si1", secrets(1)[1])
1180         ss.remote_renew_lease("si1", secrets(2)[1])
1181         ss.remote_renew_lease("si1", secrets(3)[1])
1182         ss.remote_renew_lease("si1", secrets(4)[1])
1183         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1184         # get a new copy of the leases, with the current timestamps. Reading
1185         # data and failing to renew/cancel leases should leave the timestamps
1186         # alone.
1187         all_leases = list(s0.get_leases())
1188         # renewing with a bogus token should prompt an error message
1189
1190         # examine the exception thus raised, make sure the old nodeid is
1191         # present, to provide for share migration
1192         e = self.failUnlessRaises(IndexError,
1193                                   ss.remote_renew_lease, "si1",
1194                                   secrets(20)[1])
1195         e_s = str(e)
1196         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1197         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1198         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1199
1200         # same for cancelling
1201         self.failUnlessRaises(IndexError,
1202                               ss.remote_cancel_lease, "si1",
1203                               secrets(20)[2])
1204         self.compare_leases(all_leases, list(s0.get_leases()))
1205
1206         # reading shares should not modify the timestamp
1207         read("si1", [], [(0,200)])
1208         self.compare_leases(all_leases, list(s0.get_leases()))
1209
1210         write("si1", secrets(0),
1211               {0: ([], [(200, "make me bigger")], None)}, [])
1212         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1213
1214         write("si1", secrets(0),
1215               {0: ([], [(500, "make me really bigger")], None)}, [])
1216         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1217
1218         # now cancel them all
1219         ss.remote_cancel_lease("si1", secrets(0)[2])
1220         ss.remote_cancel_lease("si1", secrets(1)[2])
1221         ss.remote_cancel_lease("si1", secrets(2)[2])
1222         ss.remote_cancel_lease("si1", secrets(3)[2])
1223
1224         # the slot should still be there
1225         remaining_shares = read("si1", [], [(0,10)])
1226         self.failUnlessEqual(len(remaining_shares), 1)
1227         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1228
1229         # cancelling a non-existent lease should raise an IndexError
1230         self.failUnlessRaises(IndexError,
1231                               ss.remote_cancel_lease, "si1", "nonsecret")
1232
1233         # and the slot should still be there
1234         remaining_shares = read("si1", [], [(0,10)])
1235         self.failUnlessEqual(len(remaining_shares), 1)
1236         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1237
1238         ss.remote_cancel_lease("si1", secrets(4)[2])
1239         # now the slot should be gone
1240         no_shares = read("si1", [], [(0,10)])
1241         self.failUnlessEqual(no_shares, {})
1242
1243         # cancelling a lease on a non-existent share should raise an IndexError
1244         self.failUnlessRaises(IndexError,
1245                               ss.remote_cancel_lease, "si2", "nonsecret")
1246
1247     def test_remove(self):
1248         ss = self.create("test_remove")
1249         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1250                       set([0,1,2]), 100)
1251         readv = ss.remote_slot_readv
1252         writev = ss.remote_slot_testv_and_readv_and_writev
1253         secrets = ( self.write_enabler("we1"),
1254                     self.renew_secret("we1"),
1255                     self.cancel_secret("we1") )
1256         # delete sh0 by setting its size to zero
1257         answer = writev("si1", secrets,
1258                         {0: ([], [], 0)},
1259                         [])
1260         # the answer should mention all the shares that existed before the
1261         # write
1262         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1263         # but a new read should show only sh1 and sh2
1264         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1265                              {1: [""], 2: [""]})
1266
1267         # delete sh1 by setting its size to zero
1268         answer = writev("si1", secrets,
1269                         {1: ([], [], 0)},
1270                         [])
1271         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1272         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1273                              {2: [""]})
1274
1275         # delete sh2 by setting its size to zero
1276         answer = writev("si1", secrets,
1277                         {2: ([], [], 0)},
1278                         [])
1279         self.failUnlessEqual(answer, (True, {2:[]}) )
1280         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1281                              {})
1282         # and the bucket directory should now be gone
1283         si = base32.b2a("si1")
1284         # note: this is a detail of the storage server implementation, and
1285         # may change in the future
1286         prefix = si[:2]
1287         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1288         bucketdir = os.path.join(prefixdir, si)
1289         self.failUnless(os.path.exists(prefixdir), prefixdir)
1290         self.failIf(os.path.exists(bucketdir), bucketdir)
1291
1292 class Stats(unittest.TestCase):
1293
1294     def setUp(self):
1295         self.sparent = LoggingServiceParent()
1296         self._lease_secret = itertools.count()
1297     def tearDown(self):
1298         return self.sparent.stopService()
1299
1300     def workdir(self, name):
1301         basedir = os.path.join("storage", "Server", name)
1302         return basedir
1303
1304     def create(self, name):
1305         workdir = self.workdir(name)
1306         ss = StorageServer(workdir, "\x00" * 20)
1307         ss.setServiceParent(self.sparent)
1308         return ss
1309
1310     def test_latencies(self):
1311         ss = self.create("test_latencies")
1312         for i in range(10000):
1313             ss.add_latency("allocate", 1.0 * i)
1314         for i in range(1000):
1315             ss.add_latency("renew", 1.0 * i)
1316         for i in range(10):
1317             ss.add_latency("cancel", 2.0 * i)
1318         ss.add_latency("get", 5.0)
1319
1320         output = ss.get_latencies()
1321
1322         self.failUnlessEqual(sorted(output.keys()),
1323                              sorted(["allocate", "renew", "cancel", "get"]))
1324         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1325         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1326         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1327         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1328         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1329         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1330         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1331         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1332         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1333
1334         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1335         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1336         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
1337         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1338         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1339         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1340         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1341         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1342         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1343
1344         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1345         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1346         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1, output)
1347         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
1348         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1349         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1350         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1351         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1352         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1353
1354         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1355         self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1356         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1357         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1358         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1359         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1360         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1361         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1362         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1363
1364 def remove_tags(s):
1365     s = re.sub(r'<[^>]*>', ' ', s)
1366     s = re.sub(r'\s+', ' ', s)
1367     return s
1368
1369 class MyBucketCountingCrawler(BucketCountingCrawler):
1370     def finished_prefix(self, cycle, prefix):
1371         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1372         if self.hook_ds:
1373             d = self.hook_ds.pop(0)
1374             d.callback(None)
1375
1376 class MyStorageServer(StorageServer):
1377     def add_bucket_counter(self):
1378         statefile = os.path.join(self.storedir, "bucket_counter.state")
1379         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1380         self.bucket_counter.setServiceParent(self)
1381
1382 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1383
1384     def setUp(self):
1385         self.s = service.MultiService()
1386         self.s.startService()
1387     def tearDown(self):
1388         return self.s.stopService()
1389
1390     def test_bucket_counter(self):
1391         basedir = "storage/BucketCounter/bucket_counter"
1392         fileutil.make_dirs(basedir)
1393         ss = StorageServer(basedir, "\x00" * 20)
1394         # to make sure we capture the bucket-counting-crawler in the middle
1395         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1396         # also make it start sooner than usual.
1397         ss.bucket_counter.slow_start = 0
1398         orig_cpu_slice = ss.bucket_counter.cpu_slice
1399         ss.bucket_counter.cpu_slice = 0
1400         ss.setServiceParent(self.s)
1401
1402         w = StorageStatus(ss)
1403
1404         # this sample is before the crawler has started doing anything
1405         html = w.renderSynchronously()
1406         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1407         s = remove_tags(html)
1408         self.failUnlessIn("Accepting new shares: Yes", s)
1409         self.failUnlessIn("Reserved space: - 0 B (0)", s)
1410         self.failUnlessIn("Total buckets: Not computed yet", s)
1411         self.failUnlessIn("Next crawl in", s)
1412
1413         # give the bucket-counting-crawler one tick to get started. The
1414         # cpu_slice=0 will force it to yield right after it processes the
1415         # first prefix
1416
1417         d = fireEventually()
1418         def _check(ignored):
1419             # are we really right after the first prefix?
1420             state = ss.bucket_counter.get_state()
1421             if state["last-complete-prefix"] is None:
1422                 d2 = fireEventually()
1423                 d2.addCallback(_check)
1424                 return d2
1425             self.failUnlessEqual(state["last-complete-prefix"],
1426                                  ss.bucket_counter.prefixes[0])
1427             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1428             html = w.renderSynchronously()
1429             s = remove_tags(html)
1430             self.failUnlessIn(" Current crawl ", s)
1431             self.failUnlessIn(" (next work in ", s)
1432         d.addCallback(_check)
1433
1434         # now give it enough time to complete a full cycle
1435         def _watch():
1436             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1437         d.addCallback(lambda ignored: self.poll(_watch))
1438         def _check2(ignored):
1439             ss.bucket_counter.cpu_slice = orig_cpu_slice
1440             html = w.renderSynchronously()
1441             s = remove_tags(html)
1442             self.failUnlessIn("Total buckets: 0 (the number of", s)
1443             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1444         d.addCallback(_check2)
1445         return d
1446
1447     def test_bucket_counter_cleanup(self):
1448         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1449         fileutil.make_dirs(basedir)
1450         ss = StorageServer(basedir, "\x00" * 20)
1451         # to make sure we capture the bucket-counting-crawler in the middle
1452         # of a cycle, we reach in and reduce its maximum slice time to 0.
1453         ss.bucket_counter.slow_start = 0
1454         orig_cpu_slice = ss.bucket_counter.cpu_slice
1455         ss.bucket_counter.cpu_slice = 0
1456         ss.setServiceParent(self.s)
1457
1458         d = fireEventually()
1459
1460         def _after_first_prefix(ignored):
1461             state = ss.bucket_counter.state
1462             if state["last-complete-prefix"] is None:
1463                 d2 = fireEventually()
1464                 d2.addCallback(_after_first_prefix)
1465                 return d2
1466             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1467             # now sneak in and mess with its state, to make sure it cleans up
1468             # properly at the end of the cycle
1469             self.failUnlessEqual(state["last-complete-prefix"],
1470                                  ss.bucket_counter.prefixes[0])
1471             state["bucket-counts"][-12] = {}
1472             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1473             ss.bucket_counter.save_state()
1474         d.addCallback(_after_first_prefix)
1475
1476         # now give it enough time to complete a cycle
1477         def _watch():
1478             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1479         d.addCallback(lambda ignored: self.poll(_watch))
1480         def _check2(ignored):
1481             ss.bucket_counter.cpu_slice = orig_cpu_slice
1482             s = ss.bucket_counter.get_state()
1483             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1484             self.failIf("bogusprefix!" in s["storage-index-samples"],
1485                         s["storage-index-samples"].keys())
1486         d.addCallback(_check2)
1487         return d
1488
1489     def test_bucket_counter_eta(self):
1490         basedir = "storage/BucketCounter/bucket_counter_eta"
1491         fileutil.make_dirs(basedir)
1492         ss = MyStorageServer(basedir, "\x00" * 20)
1493         ss.bucket_counter.slow_start = 0
1494         # these will be fired inside finished_prefix()
1495         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1496         w = StorageStatus(ss)
1497
1498         d = defer.Deferred()
1499
1500         def _check_1(ignored):
1501             # no ETA is available yet
1502             html = w.renderSynchronously()
1503             s = remove_tags(html)
1504             self.failUnlessIn("complete (next work", s)
1505
1506         def _check_2(ignored):
1507             # one prefix has finished, so an ETA based upon that elapsed time
1508             # should be available.
1509             html = w.renderSynchronously()
1510             s = remove_tags(html)
1511             self.failUnlessIn("complete (ETA ", s)
1512
1513         def _check_3(ignored):
1514             # two prefixes have finished
1515             html = w.renderSynchronously()
1516             s = remove_tags(html)
1517             self.failUnlessIn("complete (ETA ", s)
1518             d.callback("done")
1519
1520         hooks[0].addCallback(_check_1).addErrback(d.errback)
1521         hooks[1].addCallback(_check_2).addErrback(d.errback)
1522         hooks[2].addCallback(_check_3).addErrback(d.errback)
1523
1524         ss.setServiceParent(self.s)
1525         return d
1526
1527 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1528     stop_after_first_bucket = False
1529     def process_bucket(self, *args, **kwargs):
1530         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1531         if self.stop_after_first_bucket:
1532             self.stop_after_first_bucket = False
1533             self.cpu_slice = -1.0
1534     def yielding(self, sleep_time):
1535         if not self.stop_after_first_bucket:
1536             self.cpu_slice = 500
1537
1538 class BrokenStatResults:
1539     pass
1540 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1541     def stat(self, fn):
1542         s = os.stat(fn)
1543         bsr = BrokenStatResults()
1544         for attrname in dir(s):
1545             if attrname.startswith("_"):
1546                 continue
1547             if attrname == "st_blocks":
1548                 continue
1549             setattr(bsr, attrname, getattr(s, attrname))
1550         return bsr
1551
1552 class InstrumentedStorageServer(StorageServer):
1553     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1554 class No_ST_BLOCKS_StorageServer(StorageServer):
1555     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1556
1557 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1558
1559     def setUp(self):
1560         self.s = service.MultiService()
1561         self.s.startService()
1562     def tearDown(self):
1563         return self.s.stopService()
1564
1565     def make_shares(self, ss):
1566         def make(si):
1567             return (si, hashutil.tagged_hash("renew", si),
1568                     hashutil.tagged_hash("cancel", si))
1569         def make_mutable(si):
1570             return (si, hashutil.tagged_hash("renew", si),
1571                     hashutil.tagged_hash("cancel", si),
1572                     hashutil.tagged_hash("write-enabler", si))
1573         def make_extra_lease(si, num):
1574             return (hashutil.tagged_hash("renew-%d" % num, si),
1575                     hashutil.tagged_hash("cancel-%d" % num, si))
1576
1577         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1578         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1579         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1580         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1581         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1582         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1583         sharenums = [0]
1584         canary = FakeCanary()
1585         # note: 'tahoe debug dump-share' will not handle this file, since the
1586         # inner contents are not a valid CHK share
1587         data = "\xff" * 1000
1588
1589         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1590                                          1000, canary)
1591         w[0].remote_write(0, data)
1592         w[0].remote_close()
1593
1594         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1595                                          1000, canary)
1596         w[0].remote_write(0, data)
1597         w[0].remote_close()
1598         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1599
1600         writev = ss.remote_slot_testv_and_readv_and_writev
1601         writev(mutable_si_2, (we2, rs2, cs2),
1602                {0: ([], [(0,data)], len(data))}, [])
1603         writev(mutable_si_3, (we3, rs3, cs3),
1604                {0: ([], [(0,data)], len(data))}, [])
1605         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1606
1607         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1608         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1609         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1610
1611     def test_basic(self):
1612         basedir = "storage/LeaseCrawler/basic"
1613         fileutil.make_dirs(basedir)
1614         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1615         # make it start sooner than usual.
1616         lc = ss.lease_checker
1617         lc.slow_start = 0
1618         lc.cpu_slice = 500
1619         lc.stop_after_first_bucket = True
1620         webstatus = StorageStatus(ss)
1621
1622         # create a few shares, with some leases on them
1623         self.make_shares(ss)
1624         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1625
1626         # add a non-sharefile to exercise another code path
1627         fn = os.path.join(ss.sharedir,
1628                           storage_index_to_dir(immutable_si_0),
1629                           "not-a-share")
1630         f = open(fn, "wb")
1631         f.write("I am not a share.\n")
1632         f.close()
1633
1634         # this is before the crawl has started, so we're not in a cycle yet
1635         initial_state = lc.get_state()
1636         self.failIf(lc.get_progress()["cycle-in-progress"])
1637         self.failIfIn("cycle-to-date", initial_state)
1638         self.failIfIn("estimated-remaining-cycle", initial_state)
1639         self.failIfIn("estimated-current-cycle", initial_state)
1640         self.failUnlessIn("history", initial_state)
1641         self.failUnlessEqual(initial_state["history"], {})
1642
1643         ss.setServiceParent(self.s)
1644
1645         DAY = 24*60*60
1646
1647         d = fireEventually()
1648
1649         # now examine the state right after the first bucket has been
1650         # processed.
1651         def _after_first_bucket(ignored):
1652             initial_state = lc.get_state()
1653             if "cycle-to-date" not in initial_state:
1654                 d2 = fireEventually()
1655                 d2.addCallback(_after_first_bucket)
1656                 return d2
1657             self.failUnlessIn("cycle-to-date", initial_state)
1658             self.failUnlessIn("estimated-remaining-cycle", initial_state)
1659             self.failUnlessIn("estimated-current-cycle", initial_state)
1660             self.failUnlessIn("history", initial_state)
1661             self.failUnlessEqual(initial_state["history"], {})
1662
1663             so_far = initial_state["cycle-to-date"]
1664             self.failUnlessEqual(so_far["expiration-enabled"], False)
1665             self.failUnlessIn("configured-expiration-mode", so_far)
1666             self.failUnlessIn("lease-age-histogram", so_far)
1667             lah = so_far["lease-age-histogram"]
1668             self.failUnlessEqual(type(lah), list)
1669             self.failUnlessEqual(len(lah), 1)
1670             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1671             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1672             self.failUnlessEqual(so_far["corrupt-shares"], [])
1673             sr1 = so_far["space-recovered"]
1674             self.failUnlessEqual(sr1["examined-buckets"], 1)
1675             self.failUnlessEqual(sr1["examined-shares"], 1)
1676             self.failUnlessEqual(sr1["actual-shares"], 0)
1677             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1678             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1679             left = initial_state["estimated-remaining-cycle"]
1680             sr2 = left["space-recovered"]
1681             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1682             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1683             self.failIfEqual(sr2["actual-shares"], None)
1684             self.failIfEqual(sr2["configured-diskbytes"], None)
1685             self.failIfEqual(sr2["original-sharebytes"], None)
1686         d.addCallback(_after_first_bucket)
1687         d.addCallback(lambda ign: self.render1(webstatus))
1688         def _check_html_in_cycle(html):
1689             s = remove_tags(html)
1690             self.failUnlessIn("So far, this cycle has examined "
1691                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1692             self.failUnlessIn("and has recovered: "
1693                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1694                               "0 B (0 B / 0 B)", s)
1695             self.failUnlessIn("If expiration were enabled, "
1696                               "we would have recovered: "
1697                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1698                               " 0 B (0 B / 0 B) by now", s)
1699             self.failUnlessIn("and the remainder of this cycle "
1700                               "would probably recover: "
1701                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1702                               " 0 B (0 B / 0 B)", s)
1703             self.failUnlessIn("and the whole cycle would probably recover: "
1704                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1705                               " 0 B (0 B / 0 B)", s)
1706             self.failUnlessIn("if we were strictly using each lease's default "
1707                               "31-day lease lifetime", s)
1708             self.failUnlessIn("this cycle would be expected to recover: ", s)
1709         d.addCallback(_check_html_in_cycle)
1710
1711         # wait for the crawler to finish the first cycle. Nothing should have
1712         # been removed.
1713         def _wait():
1714             return bool(lc.get_state()["last-cycle-finished"] is not None)
1715         d.addCallback(lambda ign: self.poll(_wait))
1716
1717         def _after_first_cycle(ignored):
1718             s = lc.get_state()
1719             self.failIf("cycle-to-date" in s)
1720             self.failIf("estimated-remaining-cycle" in s)
1721             self.failIf("estimated-current-cycle" in s)
1722             last = s["history"][0]
1723             self.failUnlessIn("cycle-start-finish-times", last)
1724             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1725             self.failUnlessEqual(last["expiration-enabled"], False)
1726             self.failUnlessIn("configured-expiration-mode", last)
1727
1728             self.failUnlessIn("lease-age-histogram", last)
1729             lah = last["lease-age-histogram"]
1730             self.failUnlessEqual(type(lah), list)
1731             self.failUnlessEqual(len(lah), 1)
1732             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1733
1734             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1735             self.failUnlessEqual(last["corrupt-shares"], [])
1736
1737             rec = last["space-recovered"]
1738             self.failUnlessEqual(rec["examined-buckets"], 4)
1739             self.failUnlessEqual(rec["examined-shares"], 4)
1740             self.failUnlessEqual(rec["actual-buckets"], 0)
1741             self.failUnlessEqual(rec["original-buckets"], 0)
1742             self.failUnlessEqual(rec["configured-buckets"], 0)
1743             self.failUnlessEqual(rec["actual-shares"], 0)
1744             self.failUnlessEqual(rec["original-shares"], 0)
1745             self.failUnlessEqual(rec["configured-shares"], 0)
1746             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1747             self.failUnlessEqual(rec["original-diskbytes"], 0)
1748             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1749             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1750             self.failUnlessEqual(rec["original-sharebytes"], 0)
1751             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1752
1753             def _get_sharefile(si):
1754                 return list(ss._iter_share_files(si))[0]
1755             def count_leases(si):
1756                 return len(list(_get_sharefile(si).get_leases()))
1757             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1758             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1759             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1760             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1761         d.addCallback(_after_first_cycle)
1762         d.addCallback(lambda ign: self.render1(webstatus))
1763         def _check_html(html):
1764             s = remove_tags(html)
1765             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1766                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1767             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1768                               "(2 mutable / 2 immutable),", s)
1769             self.failUnlessIn("but expiration was not enabled", s)
1770         d.addCallback(_check_html)
1771         d.addCallback(lambda ign: self.render_json(webstatus))
1772         def _check_json(json):
1773             data = simplejson.loads(json)
1774             self.failUnlessIn("lease-checker", data)
1775             self.failUnlessIn("lease-checker-progress", data)
1776         d.addCallback(_check_json)
1777         return d
1778
1779     def backdate_lease(self, sf, renew_secret, new_expire_time):
1780         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1781         # "renew" a lease with a new_expire_time that is older than what the
1782         # current lease has), so we have to reach inside it.
1783         for i,lease in enumerate(sf.get_leases()):
1784             if lease.renew_secret == renew_secret:
1785                 lease.expiration_time = new_expire_time
1786                 f = open(sf.home, 'rb+')
1787                 sf._write_lease_record(f, i, lease)
1788                 f.close()
1789                 return
1790         raise IndexError("unable to renew non-existent lease")
1791
1792     def test_expire_age(self):
1793         basedir = "storage/LeaseCrawler/expire_age"
1794         fileutil.make_dirs(basedir)
1795         # setting expiration_time to 2000 means that any lease which is more
1796         # than 2000s old will be expired.
1797         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1798                                        expiration_enabled=True,
1799                                        expiration_mode="age",
1800                                        expiration_override_lease_duration=2000)
1801         # make it start sooner than usual.
1802         lc = ss.lease_checker
1803         lc.slow_start = 0
1804         lc.stop_after_first_bucket = True
1805         webstatus = StorageStatus(ss)
1806
1807         # create a few shares, with some leases on them
1808         self.make_shares(ss)
1809         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1810
1811         def count_shares(si):
1812             return len(list(ss._iter_share_files(si)))
1813         def _get_sharefile(si):
1814             return list(ss._iter_share_files(si))[0]
1815         def count_leases(si):
1816             return len(list(_get_sharefile(si).get_leases()))
1817
1818         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1819         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1820         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1821         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1822         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1823         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1824         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1825         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1826
1827         # artificially crank back the expiration time on the first lease of
1828         # each share, to make it look like it expired already (age=1000s).
1829         # Some shares have an extra lease which is set to expire at the
1830         # default time in 31 days from now (age=31days). We then run the
1831         # crawler, which will expire the first lease, making some shares get
1832         # deleted and others stay alive (with one remaining lease)
1833         now = time.time()
1834
1835         sf0 = _get_sharefile(immutable_si_0)
1836         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1837         sf0_size = os.stat(sf0.home).st_size
1838
1839         # immutable_si_1 gets an extra lease
1840         sf1 = _get_sharefile(immutable_si_1)
1841         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1842
1843         sf2 = _get_sharefile(mutable_si_2)
1844         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1845         sf2_size = os.stat(sf2.home).st_size
1846
1847         # mutable_si_3 gets an extra lease
1848         sf3 = _get_sharefile(mutable_si_3)
1849         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1850
1851         ss.setServiceParent(self.s)
1852
1853         d = fireEventually()
1854         # examine the state right after the first bucket has been processed
1855         def _after_first_bucket(ignored):
1856             p = lc.get_progress()
1857             if not p["cycle-in-progress"]:
1858                 d2 = fireEventually()
1859                 d2.addCallback(_after_first_bucket)
1860                 return d2
1861         d.addCallback(_after_first_bucket)
1862         d.addCallback(lambda ign: self.render1(webstatus))
1863         def _check_html_in_cycle(html):
1864             s = remove_tags(html)
1865             # the first bucket encountered gets deleted, and its prefix
1866             # happens to be about 1/5th of the way through the ring, so the
1867             # predictor thinks we'll have 5 shares and that we'll delete them
1868             # all. This part of the test depends upon the SIs landing right
1869             # where they do now.
1870             self.failUnlessIn("The remainder of this cycle is expected to "
1871                               "recover: 4 shares, 4 buckets", s)
1872             self.failUnlessIn("The whole cycle is expected to examine "
1873                               "5 shares in 5 buckets and to recover: "
1874                               "5 shares, 5 buckets", s)
1875         d.addCallback(_check_html_in_cycle)
1876
1877         # wait for the crawler to finish the first cycle. Two shares should
1878         # have been removed
1879         def _wait():
1880             return bool(lc.get_state()["last-cycle-finished"] is not None)
1881         d.addCallback(lambda ign: self.poll(_wait))
1882
1883         def _after_first_cycle(ignored):
1884             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1885             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1886             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1887             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1888             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1889             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1890
1891             s = lc.get_state()
1892             last = s["history"][0]
1893
1894             self.failUnlessEqual(last["expiration-enabled"], True)
1895             self.failUnlessEqual(last["configured-expiration-mode"],
1896                                  ("age", 2000, None, ("mutable", "immutable")))
1897             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1898
1899             rec = last["space-recovered"]
1900             self.failUnlessEqual(rec["examined-buckets"], 4)
1901             self.failUnlessEqual(rec["examined-shares"], 4)
1902             self.failUnlessEqual(rec["actual-buckets"], 2)
1903             self.failUnlessEqual(rec["original-buckets"], 2)
1904             self.failUnlessEqual(rec["configured-buckets"], 2)
1905             self.failUnlessEqual(rec["actual-shares"], 2)
1906             self.failUnlessEqual(rec["original-shares"], 2)
1907             self.failUnlessEqual(rec["configured-shares"], 2)
1908             size = sf0_size + sf2_size
1909             self.failUnlessEqual(rec["actual-sharebytes"], size)
1910             self.failUnlessEqual(rec["original-sharebytes"], size)
1911             self.failUnlessEqual(rec["configured-sharebytes"], size)
1912             # different platforms have different notions of "blocks used by
1913             # this file", so merely assert that it's a number
1914             self.failUnless(rec["actual-diskbytes"] >= 0,
1915                             rec["actual-diskbytes"])
1916             self.failUnless(rec["original-diskbytes"] >= 0,
1917                             rec["original-diskbytes"])
1918             self.failUnless(rec["configured-diskbytes"] >= 0,
1919                             rec["configured-diskbytes"])
1920         d.addCallback(_after_first_cycle)
1921         d.addCallback(lambda ign: self.render1(webstatus))
1922         def _check_html(html):
1923             s = remove_tags(html)
1924             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1925             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1926             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1927         d.addCallback(_check_html)
1928         return d
1929
1930     def test_expire_cutoff_date(self):
1931         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1932         fileutil.make_dirs(basedir)
1933         # setting cutoff-date to 2000 seconds ago means that any lease which
1934         # is more than 2000s old will be expired.
1935         now = time.time()
1936         then = int(now - 2000)
1937         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1938                                        expiration_enabled=True,
1939                                        expiration_mode="cutoff-date",
1940                                        expiration_cutoff_date=then)
1941         # make it start sooner than usual.
1942         lc = ss.lease_checker
1943         lc.slow_start = 0
1944         lc.stop_after_first_bucket = True
1945         webstatus = StorageStatus(ss)
1946
1947         # create a few shares, with some leases on them
1948         self.make_shares(ss)
1949         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1950
1951         def count_shares(si):
1952             return len(list(ss._iter_share_files(si)))
1953         def _get_sharefile(si):
1954             return list(ss._iter_share_files(si))[0]
1955         def count_leases(si):
1956             return len(list(_get_sharefile(si).get_leases()))
1957
1958         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1959         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1960         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1961         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1962         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1963         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1964         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1965         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1966
1967         # artificially crank back the expiration time on the first lease of
1968         # each share, to make it look like was renewed 3000s ago. To achieve
1969         # this, we need to set the expiration time to now-3000+31days. This
1970         # will change when the lease format is improved to contain both
1971         # create/renew time and duration.
1972         new_expiration_time = now - 3000 + 31*24*60*60
1973
1974         # Some shares have an extra lease which is set to expire at the
1975         # default time in 31 days from now (age=31days). We then run the
1976         # crawler, which will expire the first lease, making some shares get
1977         # deleted and others stay alive (with one remaining lease)
1978
1979         sf0 = _get_sharefile(immutable_si_0)
1980         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1981         sf0_size = os.stat(sf0.home).st_size
1982
1983         # immutable_si_1 gets an extra lease
1984         sf1 = _get_sharefile(immutable_si_1)
1985         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1986
1987         sf2 = _get_sharefile(mutable_si_2)
1988         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1989         sf2_size = os.stat(sf2.home).st_size
1990
1991         # mutable_si_3 gets an extra lease
1992         sf3 = _get_sharefile(mutable_si_3)
1993         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1994
1995         ss.setServiceParent(self.s)
1996
1997         d = fireEventually()
1998         # examine the state right after the first bucket has been processed
1999         def _after_first_bucket(ignored):
2000             p = lc.get_progress()
2001             if not p["cycle-in-progress"]:
2002                 d2 = fireEventually()
2003                 d2.addCallback(_after_first_bucket)
2004                 return d2
2005         d.addCallback(_after_first_bucket)
2006         d.addCallback(lambda ign: self.render1(webstatus))
2007         def _check_html_in_cycle(html):
2008             s = remove_tags(html)
2009             # the first bucket encountered gets deleted, and its prefix
2010             # happens to be about 1/5th of the way through the ring, so the
2011             # predictor thinks we'll have 5 shares and that we'll delete them
2012             # all. This part of the test depends upon the SIs landing right
2013             # where they do now.
2014             self.failUnlessIn("The remainder of this cycle is expected to "
2015                               "recover: 4 shares, 4 buckets", s)
2016             self.failUnlessIn("The whole cycle is expected to examine "
2017                               "5 shares in 5 buckets and to recover: "
2018                               "5 shares, 5 buckets", s)
2019         d.addCallback(_check_html_in_cycle)
2020
2021         # wait for the crawler to finish the first cycle. Two shares should
2022         # have been removed
2023         def _wait():
2024             return bool(lc.get_state()["last-cycle-finished"] is not None)
2025         d.addCallback(lambda ign: self.poll(_wait))
2026
2027         def _after_first_cycle(ignored):
2028             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2029             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2030             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2031             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2032             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2033             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2034
2035             s = lc.get_state()
2036             last = s["history"][0]
2037
2038             self.failUnlessEqual(last["expiration-enabled"], True)
2039             self.failUnlessEqual(last["configured-expiration-mode"],
2040                                  ("cutoff-date", None, then,
2041                                   ("mutable", "immutable")))
2042             self.failUnlessEqual(last["leases-per-share-histogram"],
2043                                  {1: 2, 2: 2})
2044
2045             rec = last["space-recovered"]
2046             self.failUnlessEqual(rec["examined-buckets"], 4)
2047             self.failUnlessEqual(rec["examined-shares"], 4)
2048             self.failUnlessEqual(rec["actual-buckets"], 2)
2049             self.failUnlessEqual(rec["original-buckets"], 0)
2050             self.failUnlessEqual(rec["configured-buckets"], 2)
2051             self.failUnlessEqual(rec["actual-shares"], 2)
2052             self.failUnlessEqual(rec["original-shares"], 0)
2053             self.failUnlessEqual(rec["configured-shares"], 2)
2054             size = sf0_size + sf2_size
2055             self.failUnlessEqual(rec["actual-sharebytes"], size)
2056             self.failUnlessEqual(rec["original-sharebytes"], 0)
2057             self.failUnlessEqual(rec["configured-sharebytes"], size)
2058             # different platforms have different notions of "blocks used by
2059             # this file", so merely assert that it's a number
2060             self.failUnless(rec["actual-diskbytes"] >= 0,
2061                             rec["actual-diskbytes"])
2062             self.failUnless(rec["original-diskbytes"] >= 0,
2063                             rec["original-diskbytes"])
2064             self.failUnless(rec["configured-diskbytes"] >= 0,
2065                             rec["configured-diskbytes"])
2066         d.addCallback(_after_first_cycle)
2067         d.addCallback(lambda ign: self.render1(webstatus))
2068         def _check_html(html):
2069             s = remove_tags(html)
2070             self.failUnlessIn("Expiration Enabled:"
2071                               " expired leases will be removed", s)
2072             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2073             substr = "Leases created or last renewed before %s will be considered expired." % date
2074             self.failUnlessIn(substr, s)
2075             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2076         d.addCallback(_check_html)
2077         return d
2078
2079     def test_only_immutable(self):
2080         basedir = "storage/LeaseCrawler/only_immutable"
2081         fileutil.make_dirs(basedir)
2082         now = time.time()
2083         then = int(now - 2000)
2084         ss = StorageServer(basedir, "\x00" * 20,
2085                            expiration_enabled=True,
2086                            expiration_mode="cutoff-date",
2087                            expiration_cutoff_date=then,
2088                            expiration_sharetypes=("immutable",))
2089         lc = ss.lease_checker
2090         lc.slow_start = 0
2091         webstatus = StorageStatus(ss)
2092
2093         self.make_shares(ss)
2094         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2095         # set all leases to be expirable
2096         new_expiration_time = now - 3000 + 31*24*60*60
2097
2098         def count_shares(si):
2099             return len(list(ss._iter_share_files(si)))
2100         def _get_sharefile(si):
2101             return list(ss._iter_share_files(si))[0]
2102         def count_leases(si):
2103             return len(list(_get_sharefile(si).get_leases()))
2104
2105         sf0 = _get_sharefile(immutable_si_0)
2106         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2107         sf1 = _get_sharefile(immutable_si_1)
2108         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2109         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2110         sf2 = _get_sharefile(mutable_si_2)
2111         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2112         sf3 = _get_sharefile(mutable_si_3)
2113         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2114         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2115
2116         ss.setServiceParent(self.s)
2117         def _wait():
2118             return bool(lc.get_state()["last-cycle-finished"] is not None)
2119         d = self.poll(_wait)
2120
2121         def _after_first_cycle(ignored):
2122             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2123             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2124             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2125             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2126             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2127             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2128         d.addCallback(_after_first_cycle)
2129         d.addCallback(lambda ign: self.render1(webstatus))
2130         def _check_html(html):
2131             s = remove_tags(html)
2132             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2133         d.addCallback(_check_html)
2134         return d
2135
2136     def test_only_mutable(self):
2137         basedir = "storage/LeaseCrawler/only_mutable"
2138         fileutil.make_dirs(basedir)
2139         now = time.time()
2140         then = int(now - 2000)
2141         ss = StorageServer(basedir, "\x00" * 20,
2142                            expiration_enabled=True,
2143                            expiration_mode="cutoff-date",
2144                            expiration_cutoff_date=then,
2145                            expiration_sharetypes=("mutable",))
2146         lc = ss.lease_checker
2147         lc.slow_start = 0
2148         webstatus = StorageStatus(ss)
2149
2150         self.make_shares(ss)
2151         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2152         # set all leases to be expirable
2153         new_expiration_time = now - 3000 + 31*24*60*60
2154
2155         def count_shares(si):
2156             return len(list(ss._iter_share_files(si)))
2157         def _get_sharefile(si):
2158             return list(ss._iter_share_files(si))[0]
2159         def count_leases(si):
2160             return len(list(_get_sharefile(si).get_leases()))
2161
2162         sf0 = _get_sharefile(immutable_si_0)
2163         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2164         sf1 = _get_sharefile(immutable_si_1)
2165         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2166         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2167         sf2 = _get_sharefile(mutable_si_2)
2168         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2169         sf3 = _get_sharefile(mutable_si_3)
2170         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2171         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2172
2173         ss.setServiceParent(self.s)
2174         def _wait():
2175             return bool(lc.get_state()["last-cycle-finished"] is not None)
2176         d = self.poll(_wait)
2177
2178         def _after_first_cycle(ignored):
2179             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2180             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2181             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2182             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2183             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2184             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2185         d.addCallback(_after_first_cycle)
2186         d.addCallback(lambda ign: self.render1(webstatus))
2187         def _check_html(html):
2188             s = remove_tags(html)
2189             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2190         d.addCallback(_check_html)
2191         return d
2192
2193     def test_bad_mode(self):
2194         basedir = "storage/LeaseCrawler/bad_mode"
2195         fileutil.make_dirs(basedir)
2196         e = self.failUnlessRaises(ValueError,
2197                                   StorageServer, basedir, "\x00" * 20,
2198                                   expiration_mode="bogus")
2199         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2200
2201     def test_parse_duration(self):
2202         DAY = 24*60*60
2203         MONTH = 31*DAY
2204         YEAR = 365*DAY
2205         p = time_format.parse_duration
2206         self.failUnlessEqual(p("7days"), 7*DAY)
2207         self.failUnlessEqual(p("31day"), 31*DAY)
2208         self.failUnlessEqual(p("60 days"), 60*DAY)
2209         self.failUnlessEqual(p("2mo"), 2*MONTH)
2210         self.failUnlessEqual(p("3 month"), 3*MONTH)
2211         self.failUnlessEqual(p("2years"), 2*YEAR)
2212         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2213         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2214
2215     def test_parse_date(self):
2216         p = time_format.parse_date
2217         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2218         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2219
2220     def test_limited_history(self):
2221         basedir = "storage/LeaseCrawler/limited_history"
2222         fileutil.make_dirs(basedir)
2223         ss = StorageServer(basedir, "\x00" * 20)
2224         # make it start sooner than usual.
2225         lc = ss.lease_checker
2226         lc.slow_start = 0
2227         lc.cpu_slice = 500
2228
2229         # create a few shares, with some leases on them
2230         self.make_shares(ss)
2231
2232         ss.setServiceParent(self.s)
2233
2234         def _wait_until_15_cycles_done():
2235             last = lc.state["last-cycle-finished"]
2236             if last is not None and last >= 15:
2237                 return True
2238             if lc.timer:
2239                 lc.timer.reset(0)
2240             return False
2241         d = self.poll(_wait_until_15_cycles_done)
2242
2243         def _check(ignored):
2244             s = lc.get_state()
2245             h = s["history"]
2246             self.failUnlessEqual(len(h), 10)
2247             self.failUnlessEqual(max(h.keys()), 15)
2248             self.failUnlessEqual(min(h.keys()), 6)
2249         d.addCallback(_check)
2250         return d
2251
2252     def test_unpredictable_future(self):
2253         basedir = "storage/LeaseCrawler/unpredictable_future"
2254         fileutil.make_dirs(basedir)
2255         ss = StorageServer(basedir, "\x00" * 20)
2256         # make it start sooner than usual.
2257         lc = ss.lease_checker
2258         lc.slow_start = 0
2259         lc.cpu_slice = -1.0 # stop quickly
2260
2261         self.make_shares(ss)
2262
2263         ss.setServiceParent(self.s)
2264
2265         d = fireEventually()
2266         def _check(ignored):
2267             # this should fire after the first bucket is complete, but before
2268             # the first prefix is complete, so the progress-measurer won't
2269             # think we've gotten far enough to raise our percent-complete
2270             # above 0%, triggering the cannot-predict-the-future code in
2271             # expirer.py . This will have to change if/when the
2272             # progress-measurer gets smart enough to count buckets (we'll
2273             # have to interrupt it even earlier, before it's finished the
2274             # first bucket).
2275             s = lc.get_state()
2276             if "cycle-to-date" not in s:
2277                 d2 = fireEventually()
2278                 d2.addCallback(_check)
2279                 return d2
2280             self.failUnlessIn("cycle-to-date", s)
2281             self.failUnlessIn("estimated-remaining-cycle", s)
2282             self.failUnlessIn("estimated-current-cycle", s)
2283
2284             left = s["estimated-remaining-cycle"]["space-recovered"]
2285             self.failUnlessEqual(left["actual-buckets"], None)
2286             self.failUnlessEqual(left["original-buckets"], None)
2287             self.failUnlessEqual(left["configured-buckets"], None)
2288             self.failUnlessEqual(left["actual-shares"], None)
2289             self.failUnlessEqual(left["original-shares"], None)
2290             self.failUnlessEqual(left["configured-shares"], None)
2291             self.failUnlessEqual(left["actual-diskbytes"], None)
2292             self.failUnlessEqual(left["original-diskbytes"], None)
2293             self.failUnlessEqual(left["configured-diskbytes"], None)
2294             self.failUnlessEqual(left["actual-sharebytes"], None)
2295             self.failUnlessEqual(left["original-sharebytes"], None)
2296             self.failUnlessEqual(left["configured-sharebytes"], None)
2297
2298             full = s["estimated-remaining-cycle"]["space-recovered"]
2299             self.failUnlessEqual(full["actual-buckets"], None)
2300             self.failUnlessEqual(full["original-buckets"], None)
2301             self.failUnlessEqual(full["configured-buckets"], None)
2302             self.failUnlessEqual(full["actual-shares"], None)
2303             self.failUnlessEqual(full["original-shares"], None)
2304             self.failUnlessEqual(full["configured-shares"], None)
2305             self.failUnlessEqual(full["actual-diskbytes"], None)
2306             self.failUnlessEqual(full["original-diskbytes"], None)
2307             self.failUnlessEqual(full["configured-diskbytes"], None)
2308             self.failUnlessEqual(full["actual-sharebytes"], None)
2309             self.failUnlessEqual(full["original-sharebytes"], None)
2310             self.failUnlessEqual(full["configured-sharebytes"], None)
2311
2312         d.addCallback(_check)
2313         return d
2314
2315     def test_no_st_blocks(self):
2316         basedir = "storage/LeaseCrawler/no_st_blocks"
2317         fileutil.make_dirs(basedir)
2318         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2319                                         expiration_mode="age",
2320                                         expiration_override_lease_duration=-1000)
2321         # a negative expiration_time= means the "configured-"
2322         # space-recovered counts will be non-zero, since all shares will have
2323         # expired by then
2324
2325         # make it start sooner than usual.
2326         lc = ss.lease_checker
2327         lc.slow_start = 0
2328
2329         self.make_shares(ss)
2330         ss.setServiceParent(self.s)
2331         def _wait():
2332             return bool(lc.get_state()["last-cycle-finished"] is not None)
2333         d = self.poll(_wait)
2334
2335         def _check(ignored):
2336             s = lc.get_state()
2337             last = s["history"][0]
2338             rec = last["space-recovered"]
2339             self.failUnlessEqual(rec["configured-buckets"], 4)
2340             self.failUnlessEqual(rec["configured-shares"], 4)
2341             self.failUnless(rec["configured-sharebytes"] > 0,
2342                             rec["configured-sharebytes"])
2343             # without the .st_blocks field in os.stat() results, we should be
2344             # reporting diskbytes==sharebytes
2345             self.failUnlessEqual(rec["configured-sharebytes"],
2346                                  rec["configured-diskbytes"])
2347         d.addCallback(_check)
2348         return d
2349
2350     def test_share_corruption(self):
2351         self._poll_should_ignore_these_errors = [
2352             UnknownMutableContainerVersionError,
2353             UnknownImmutableContainerVersionError,
2354             ]
2355         basedir = "storage/LeaseCrawler/share_corruption"
2356         fileutil.make_dirs(basedir)
2357         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2358         w = StorageStatus(ss)
2359         # make it start sooner than usual.
2360         lc = ss.lease_checker
2361         lc.stop_after_first_bucket = True
2362         lc.slow_start = 0
2363         lc.cpu_slice = 500
2364
2365         # create a few shares, with some leases on them
2366         self.make_shares(ss)
2367
2368         # now corrupt one, and make sure the lease-checker keeps going
2369         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2370         first = min(self.sis)
2371         first_b32 = base32.b2a(first)
2372         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2373         f = open(fn, "rb+")
2374         f.seek(0)
2375         f.write("BAD MAGIC")
2376         f.close()
2377         # if get_share_file() doesn't see the correct mutable magic, it
2378         # assumes the file is an immutable share, and then
2379         # immutable.ShareFile sees a bad version. So regardless of which kind
2380         # of share we corrupted, this will trigger an
2381         # UnknownImmutableContainerVersionError.
2382
2383         # also create an empty bucket
2384         empty_si = base32.b2a("\x04"*16)
2385         empty_bucket_dir = os.path.join(ss.sharedir,
2386                                         storage_index_to_dir(empty_si))
2387         fileutil.make_dirs(empty_bucket_dir)
2388
2389         ss.setServiceParent(self.s)
2390
2391         d = fireEventually()
2392
2393         # now examine the state right after the first bucket has been
2394         # processed.
2395         def _after_first_bucket(ignored):
2396             s = lc.get_state()
2397             if "cycle-to-date" not in s:
2398                 d2 = fireEventually()
2399                 d2.addCallback(_after_first_bucket)
2400                 return d2
2401             so_far = s["cycle-to-date"]
2402             rec = so_far["space-recovered"]
2403             self.failUnlessEqual(rec["examined-buckets"], 1)
2404             self.failUnlessEqual(rec["examined-shares"], 0)
2405             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2406         d.addCallback(_after_first_bucket)
2407
2408         d.addCallback(lambda ign: self.render_json(w))
2409         def _check_json(json):
2410             data = simplejson.loads(json)
2411             # grr. json turns all dict keys into strings.
2412             so_far = data["lease-checker"]["cycle-to-date"]
2413             corrupt_shares = so_far["corrupt-shares"]
2414             # it also turns all tuples into lists
2415             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2416         d.addCallback(_check_json)
2417         d.addCallback(lambda ign: self.render1(w))
2418         def _check_html(html):
2419             s = remove_tags(html)
2420             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2421         d.addCallback(_check_html)
2422
2423         def _wait():
2424             return bool(lc.get_state()["last-cycle-finished"] is not None)
2425         d.addCallback(lambda ign: self.poll(_wait))
2426
2427         def _after_first_cycle(ignored):
2428             s = lc.get_state()
2429             last = s["history"][0]
2430             rec = last["space-recovered"]
2431             self.failUnlessEqual(rec["examined-buckets"], 5)
2432             self.failUnlessEqual(rec["examined-shares"], 3)
2433             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2434         d.addCallback(_after_first_cycle)
2435         d.addCallback(lambda ign: self.render_json(w))
2436         def _check_json_history(json):
2437             data = simplejson.loads(json)
2438             last = data["lease-checker"]["history"]["0"]
2439             corrupt_shares = last["corrupt-shares"]
2440             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2441         d.addCallback(_check_json_history)
2442         d.addCallback(lambda ign: self.render1(w))
2443         def _check_html_history(html):
2444             s = remove_tags(html)
2445             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2446         d.addCallback(_check_html_history)
2447
2448         def _cleanup(res):
2449             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2450                                    UnknownImmutableContainerVersionError)
2451             return res
2452         d.addBoth(_cleanup)
2453         return d
2454
2455     def render_json(self, page):
2456         d = self.render1(page, args={"t": ["json"]})
2457         return d
2458
2459 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2460
2461     def setUp(self):
2462         self.s = service.MultiService()
2463         self.s.startService()
2464     def tearDown(self):
2465         return self.s.stopService()
2466
2467     def test_no_server(self):
2468         w = StorageStatus(None)
2469         html = w.renderSynchronously()
2470         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2471
2472     def test_status(self):
2473         basedir = "storage/WebStatus/status"
2474         fileutil.make_dirs(basedir)
2475         ss = StorageServer(basedir, "\x00" * 20)
2476         ss.setServiceParent(self.s)
2477         w = StorageStatus(ss)
2478         d = self.render1(w)
2479         def _check_html(html):
2480             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2481             s = remove_tags(html)
2482             self.failUnlessIn("Accepting new shares: Yes", s)
2483             self.failUnlessIn("Reserved space: - 0 B (0)", s)
2484         d.addCallback(_check_html)
2485         d.addCallback(lambda ign: self.render_json(w))
2486         def _check_json(json):
2487             data = simplejson.loads(json)
2488             s = data["stats"]
2489             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2490             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2491             self.failUnlessIn("bucket-counter", data)
2492             self.failUnlessIn("lease-checker", data)
2493         d.addCallback(_check_json)
2494         return d
2495
2496     def render_json(self, page):
2497         d = self.render1(page, args={"t": ["json"]})
2498         return d
2499
2500     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2501     def test_status_no_disk_stats(self, mock_get_disk_stats):
2502         mock_get_disk_stats.side_effect = AttributeError()
2503
2504         # Some platforms may have no disk stats API. Make sure the code can handle that
2505         # (test runs on all platforms).
2506         basedir = "storage/WebStatus/status_no_disk_stats"
2507         fileutil.make_dirs(basedir)
2508         ss = StorageServer(basedir, "\x00" * 20)
2509         ss.setServiceParent(self.s)
2510         w = StorageStatus(ss)
2511         html = w.renderSynchronously()
2512         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2513         s = remove_tags(html)
2514         self.failUnlessIn("Accepting new shares: Yes", s)
2515         self.failUnlessIn("Total disk space: ?", s)
2516         self.failUnlessIn("Space Available to Tahoe: ?", s)
2517         self.failUnless(ss.get_available_space() is None)
2518
2519     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2520     def test_status_bad_disk_stats(self, mock_get_disk_stats):
2521         mock_get_disk_stats.side_effect = OSError()
2522
2523         # If the API to get disk stats exists but a call to it fails, then the status should
2524         # show that no shares will be accepted, and get_available_space() should be 0.
2525         basedir = "storage/WebStatus/status_bad_disk_stats"
2526         fileutil.make_dirs(basedir)
2527         ss = StorageServer(basedir, "\x00" * 20)
2528         ss.setServiceParent(self.s)
2529         w = StorageStatus(ss)
2530         html = w.renderSynchronously()
2531         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2532         s = remove_tags(html)
2533         self.failUnlessIn("Accepting new shares: No", s)
2534         self.failUnlessIn("Total disk space: ?", s)
2535         self.failUnlessIn("Space Available to Tahoe: ?", s)
2536         self.failUnlessEqual(ss.get_available_space(), 0)
2537
2538     def test_readonly(self):
2539         basedir = "storage/WebStatus/readonly"
2540         fileutil.make_dirs(basedir)
2541         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2542         ss.setServiceParent(self.s)
2543         w = StorageStatus(ss)
2544         html = w.renderSynchronously()
2545         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2546         s = remove_tags(html)
2547         self.failUnlessIn("Accepting new shares: No", s)
2548
2549     def test_reserved(self):
2550         basedir = "storage/WebStatus/reserved"
2551         fileutil.make_dirs(basedir)
2552         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2553         ss.setServiceParent(self.s)
2554         w = StorageStatus(ss)
2555         html = w.renderSynchronously()
2556         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2557         s = remove_tags(html)
2558         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2559
2560     def test_huge_reserved(self):
2561         basedir = "storage/WebStatus/reserved"
2562         fileutil.make_dirs(basedir)
2563         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2564         ss.setServiceParent(self.s)
2565         w = StorageStatus(ss)
2566         html = w.renderSynchronously()
2567         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2568         s = remove_tags(html)
2569         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2570
2571     def test_util(self):
2572         w = StorageStatus(None)
2573         self.failUnlessEqual(w.render_space(None, None), "?")
2574         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2575         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2576         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2577         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2578         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)