]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
d63915842a769a9112b75ee70d241a1fa6e1b8e5
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.interfaces import BadWriteEnablerError
24 from allmydata.test.common import LoggingServiceParent
25 from allmydata.test.common_web import WebRenderingMixin
26 from allmydata.web.storage import StorageStatus, remove_prefix
27
28 class Marker:
29     pass
30 class FakeCanary:
31     def __init__(self, ignore_disconnectors=False):
32         self.ignore = ignore_disconnectors
33         self.disconnectors = {}
34     def notifyOnDisconnect(self, f, *args, **kwargs):
35         if self.ignore:
36             return
37         m = Marker()
38         self.disconnectors[m] = (f, args, kwargs)
39         return m
40     def dontNotifyOnDisconnect(self, marker):
41         if self.ignore:
42             return
43         del self.disconnectors[marker]
44
45 class FakeStatsProvider:
46     def count(self, name, delta=1):
47         pass
48     def register_producer(self, producer):
49         pass
50
51 class Bucket(unittest.TestCase):
52     def make_workdir(self, name):
53         basedir = os.path.join("storage", "Bucket", name)
54         incoming = os.path.join(basedir, "tmp", "bucket")
55         final = os.path.join(basedir, "bucket")
56         fileutil.make_dirs(basedir)
57         fileutil.make_dirs(os.path.join(basedir, "tmp"))
58         return incoming, final
59
60     def bucket_writer_closed(self, bw, consumed):
61         pass
62     def add_latency(self, category, latency):
63         pass
64     def count(self, name, delta=1):
65         pass
66
67     def make_lease(self):
68         owner_num = 0
69         renew_secret = os.urandom(32)
70         cancel_secret = os.urandom(32)
71         expiration_time = time.time() + 5000
72         return LeaseInfo(owner_num, renew_secret, cancel_secret,
73                          expiration_time, "\x00" * 20)
74
75     def test_create(self):
76         incoming, final = self.make_workdir("test_create")
77         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78                           FakeCanary())
79         bw.remote_write(0, "a"*25)
80         bw.remote_write(25, "b"*25)
81         bw.remote_write(50, "c"*25)
82         bw.remote_write(75, "d"*7)
83         bw.remote_close()
84
85     def test_readwrite(self):
86         incoming, final = self.make_workdir("test_readwrite")
87         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88                           FakeCanary())
89         bw.remote_write(0, "a"*25)
90         bw.remote_write(25, "b"*25)
91         bw.remote_write(50, "c"*7) # last block may be short
92         bw.remote_close()
93
94         # now read from it
95         br = BucketReader(self, bw.finalhome)
96         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
97         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
98         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
99
100 class RemoteBucket:
101
102     def callRemote(self, methname, *args, **kwargs):
103         def _call():
104             meth = getattr(self.target, "remote_" + methname)
105             return meth(*args, **kwargs)
106         return defer.maybeDeferred(_call)
107
108 class BucketProxy(unittest.TestCase):
109     def make_bucket(self, name, size):
110         basedir = os.path.join("storage", "BucketProxy", name)
111         incoming = os.path.join(basedir, "tmp", "bucket")
112         final = os.path.join(basedir, "bucket")
113         fileutil.make_dirs(basedir)
114         fileutil.make_dirs(os.path.join(basedir, "tmp"))
115         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
116                           FakeCanary())
117         rb = RemoteBucket()
118         rb.target = bw
119         return bw, rb, final
120
121     def make_lease(self):
122         owner_num = 0
123         renew_secret = os.urandom(32)
124         cancel_secret = os.urandom(32)
125         expiration_time = time.time() + 5000
126         return LeaseInfo(owner_num, renew_secret, cancel_secret,
127                          expiration_time, "\x00" * 20)
128
129     def bucket_writer_closed(self, bw, consumed):
130         pass
131     def add_latency(self, category, latency):
132         pass
133     def count(self, name, delta=1):
134         pass
135
136     def test_create(self):
137         bw, rb, sharefname = self.make_bucket("test_create", 500)
138         bp = WriteBucketProxy(rb,
139                               data_size=300,
140                               block_size=10,
141                               num_segments=5,
142                               num_share_hashes=3,
143                               uri_extension_size_max=500, nodeid=None)
144         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
145
146     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
147         # Let's pretend each share has 100 bytes of data, and that there are
148         # 4 segments (25 bytes each), and 8 shares total. So the two
149         # per-segment merkle trees (crypttext_hash_tree,
150         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
151         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
152         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
153         # long. That should make the whole share:
154         #
155         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
156         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157
158         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159
160         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161                             for i in range(7)]
162         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163                         for i in range(7)]
164         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165                         for i in (1,9,13)]
166         uri_extension = "s" + "E"*498 + "e"
167
168         bw, rb, sharefname = self.make_bucket(name, sharesize)
169         bp = wbp_class(rb,
170                        data_size=95,
171                        block_size=25,
172                        num_segments=4,
173                        num_share_hashes=3,
174                        uri_extension_size_max=len(uri_extension),
175                        nodeid=None)
176
177         d = bp.put_header()
178         d.addCallback(lambda res: bp.put_block(0, "a"*25))
179         d.addCallback(lambda res: bp.put_block(1, "b"*25))
180         d.addCallback(lambda res: bp.put_block(2, "c"*25))
181         d.addCallback(lambda res: bp.put_block(3, "d"*20))
182         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
183         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
184         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
185         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
186         d.addCallback(lambda res: bp.close())
187
188         # now read everything back
189         def _start_reading(res):
190             br = BucketReader(self, sharefname)
191             rb = RemoteBucket()
192             rb.target = br
193             rbp = rbp_class(rb, peerid="abc", storage_index="")
194             self.failUnlessIn("to peer", repr(rbp))
195             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
196
197             d1 = rbp.get_block_data(0, 25, 25)
198             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
199             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
200             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
201             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
202             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
203             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
204             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205
206             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
207             d1.addCallback(lambda res:
208                            self.failUnlessEqual(res, crypttext_hashes))
209             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
210             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
211             d1.addCallback(lambda res: rbp.get_share_hashes())
212             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
213             d1.addCallback(lambda res: rbp.get_uri_extension())
214             d1.addCallback(lambda res:
215                            self.failUnlessEqual(res, uri_extension))
216
217             return d1
218
219         d.addCallback(_start_reading)
220
221         return d
222
223     def test_readwrite_v1(self):
224         return self._do_test_readwrite("test_readwrite_v1",
225                                        0x24, WriteBucketProxy, ReadBucketProxy)
226
227     def test_readwrite_v2(self):
228         return self._do_test_readwrite("test_readwrite_v2",
229                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
230
231 class Server(unittest.TestCase):
232
233     def setUp(self):
234         self.sparent = LoggingServiceParent()
235         self.sparent.startService()
236         self._lease_secret = itertools.count()
237     def tearDown(self):
238         return self.sparent.stopService()
239
240     def workdir(self, name):
241         basedir = os.path.join("storage", "Server", name)
242         return basedir
243
244     def create(self, name, reserved_space=0, klass=StorageServer):
245         workdir = self.workdir(name)
246         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
247                    stats_provider=FakeStatsProvider())
248         ss.setServiceParent(self.sparent)
249         return ss
250
251     def test_create(self):
252         self.create("test_create")
253
254     def allocate(self, ss, storage_index, sharenums, size, canary=None):
255         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
256         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
257         if not canary:
258             canary = FakeCanary()
259         return ss.remote_allocate_buckets(storage_index,
260                                           renew_secret, cancel_secret,
261                                           sharenums, size, canary)
262
263     def test_large_share(self):
264         syslow = platform.system().lower()
265         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
266             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
267
268         avail = fileutil.get_available_space('.', 512*2**20)
269         if avail <= 4*2**30:
270             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
271
272         ss = self.create("test_large_share")
273
274         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
275         self.failUnlessEqual(already, set())
276         self.failUnlessEqual(set(writers.keys()), set([0]))
277
278         shnum, bucket = writers.items()[0]
279         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
280         bucket.remote_write(2**32, "ab")
281         bucket.remote_close()
282
283         readers = ss.remote_get_buckets("allocate")
284         reader = readers[shnum]
285         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
286
287     def test_dont_overfill_dirs(self):
288         """
289         This test asserts that if you add a second share whose storage index
290         share lots of leading bits with an extant share (but isn't the exact
291         same storage index), this won't add an entry to the share directory.
292         """
293         ss = self.create("test_dont_overfill_dirs")
294         already, writers = self.allocate(ss, "storageindex", [0], 10)
295         for i, wb in writers.items():
296             wb.remote_write(0, "%10d" % i)
297             wb.remote_close()
298         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
299                                 "shares")
300         children_of_storedir = set(os.listdir(storedir))
301
302         # Now store another one under another storageindex that has leading
303         # chars the same as the first storageindex.
304         already, writers = self.allocate(ss, "storageindey", [0], 10)
305         for i, wb in writers.items():
306             wb.remote_write(0, "%10d" % i)
307             wb.remote_close()
308         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
309                                 "shares")
310         new_children_of_storedir = set(os.listdir(storedir))
311         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
312
313     def test_remove_incoming(self):
314         ss = self.create("test_remove_incoming")
315         already, writers = self.allocate(ss, "vid", range(3), 10)
316         for i,wb in writers.items():
317             wb.remote_write(0, "%10d" % i)
318             wb.remote_close()
319         incoming_share_dir = wb.incominghome
320         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
321         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
322         incoming_dir = os.path.dirname(incoming_prefix_dir)
323         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
324         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
325         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
326
327     def test_abort(self):
328         # remote_abort, when called on a writer, should make sure that
329         # the allocated size of the bucket is not counted by the storage
330         # server when accounting for space.
331         ss = self.create("test_abort")
332         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
333         self.failIfEqual(ss.allocated_size(), 0)
334
335         # Now abort the writers.
336         for writer in writers.itervalues():
337             writer.remote_abort()
338         self.failUnlessEqual(ss.allocated_size(), 0)
339
340
341     def test_allocate(self):
342         ss = self.create("test_allocate")
343
344         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
345
346         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
347         self.failUnlessEqual(already, set())
348         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
349
350         # while the buckets are open, they should not count as readable
351         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
352
353         # close the buckets
354         for i,wb in writers.items():
355             wb.remote_write(0, "%25d" % i)
356             wb.remote_close()
357             # aborting a bucket that was already closed is a no-op
358             wb.remote_abort()
359
360         # now they should be readable
361         b = ss.remote_get_buckets("allocate")
362         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
363         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
364         b_str = str(b[0])
365         self.failUnlessIn("BucketReader", b_str)
366         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
367
368         # now if we ask about writing again, the server should offer those
369         # three buckets as already present. It should offer them even if we
370         # don't ask about those specific ones.
371         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
372         self.failUnlessEqual(already, set([0,1,2]))
373         self.failUnlessEqual(set(writers.keys()), set([3,4]))
374
375         # while those two buckets are open for writing, the server should
376         # refuse to offer them to uploaders
377
378         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
379         self.failUnlessEqual(already2, set([0,1,2]))
380         self.failUnlessEqual(set(writers2.keys()), set([5]))
381
382         # aborting the writes should remove the tempfiles
383         for i,wb in writers2.items():
384             wb.remote_abort()
385         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
386         self.failUnlessEqual(already2, set([0,1,2]))
387         self.failUnlessEqual(set(writers2.keys()), set([5]))
388
389         for i,wb in writers2.items():
390             wb.remote_abort()
391         for i,wb in writers.items():
392             wb.remote_abort()
393
394     def test_bad_container_version(self):
395         ss = self.create("test_bad_container_version")
396         a,w = self.allocate(ss, "si1", [0], 10)
397         w[0].remote_write(0, "\xff"*10)
398         w[0].remote_close()
399
400         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
401         f = open(fn, "rb+")
402         f.seek(0)
403         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
404         f.close()
405
406         ss.remote_get_buckets("allocate")
407
408         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
409                                   ss.remote_get_buckets, "si1")
410         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
411
412     def test_disconnect(self):
413         # simulate a disconnection
414         ss = self.create("test_disconnect")
415         canary = FakeCanary()
416         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
417         self.failUnlessEqual(already, set())
418         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
419         for (f,args,kwargs) in canary.disconnectors.values():
420             f(*args, **kwargs)
421         del already
422         del writers
423
424         # that ought to delete the incoming shares
425         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
426         self.failUnlessEqual(already, set())
427         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
428
429     @mock.patch('allmydata.util.fileutil.get_disk_stats')
430     def test_reserved_space(self, mock_get_disk_stats):
431         reserved_space=10000
432         mock_get_disk_stats.return_value = {
433             'free_for_nonroot': 15000,
434             'avail': max(15000 - reserved_space, 0),
435             }
436
437         ss = self.create("test_reserved_space", reserved_space=reserved_space)
438         # 15k available, 10k reserved, leaves 5k for shares
439
440         # a newly created and filled share incurs this much overhead, beyond
441         # the size we request.
442         OVERHEAD = 3*4
443         LEASE_SIZE = 4+32+32+4
444         canary = FakeCanary(True)
445         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
446         self.failUnlessEqual(len(writers), 3)
447         # now the StorageServer should have 3000 bytes provisionally
448         # allocated, allowing only 2000 more to be claimed
449         self.failUnlessEqual(len(ss._active_writers), 3)
450
451         # allocating 1001-byte shares only leaves room for one
452         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
453         self.failUnlessEqual(len(writers2), 1)
454         self.failUnlessEqual(len(ss._active_writers), 4)
455
456         # we abandon the first set, so their provisional allocation should be
457         # returned
458         del already
459         del writers
460         self.failUnlessEqual(len(ss._active_writers), 1)
461         # now we have a provisional allocation of 1001 bytes
462
463         # and we close the second set, so their provisional allocation should
464         # become real, long-term allocation, and grows to include the
465         # overhead.
466         for bw in writers2.values():
467             bw.remote_write(0, "a"*25)
468             bw.remote_close()
469         del already2
470         del writers2
471         del bw
472         self.failUnlessEqual(len(ss._active_writers), 0)
473
474         allocated = 1001 + OVERHEAD + LEASE_SIZE
475
476         # we have to manually increase available, since we're not doing real
477         # disk measurements
478         mock_get_disk_stats.return_value = {
479             'free_for_nonroot': 15000 - allocated,
480             'avail': max(15000 - allocated - reserved_space, 0),
481             }
482
483         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
484         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
485         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
486         self.failUnlessEqual(len(writers3), 39)
487         self.failUnlessEqual(len(ss._active_writers), 39)
488
489         del already3
490         del writers3
491         self.failUnlessEqual(len(ss._active_writers), 0)
492         ss.disownServiceParent()
493         del ss
494
495     def test_seek(self):
496         basedir = self.workdir("test_seek_behavior")
497         fileutil.make_dirs(basedir)
498         filename = os.path.join(basedir, "testfile")
499         f = open(filename, "wb")
500         f.write("start")
501         f.close()
502         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
503         # files. mode="a" preserves previous contents but does not allow
504         # seeking-to-create-holes. mode="r+" allows both.
505         f = open(filename, "rb+")
506         f.seek(100)
507         f.write("100")
508         f.close()
509         filelen = os.stat(filename)[stat.ST_SIZE]
510         self.failUnlessEqual(filelen, 100+3)
511         f2 = open(filename, "rb")
512         self.failUnlessEqual(f2.read(5), "start")
513
514
515     def test_leases(self):
516         ss = self.create("test_leases")
517         canary = FakeCanary()
518         sharenums = range(5)
519         size = 100
520
521         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
522                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
523         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
524                                                      sharenums, size, canary)
525         self.failUnlessEqual(len(already), 0)
526         self.failUnlessEqual(len(writers), 5)
527         for wb in writers.values():
528             wb.remote_close()
529
530         leases = list(ss.get_leases("si0"))
531         self.failUnlessEqual(len(leases), 1)
532         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
533
534         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
535                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
536         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
537                                                      sharenums, size, canary)
538         for wb in writers.values():
539             wb.remote_close()
540
541         # take out a second lease on si1
542         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
543                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
544         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
545                                                      sharenums, size, canary)
546         self.failUnlessEqual(len(already), 5)
547         self.failUnlessEqual(len(writers), 0)
548
549         leases = list(ss.get_leases("si1"))
550         self.failUnlessEqual(len(leases), 2)
551         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
552
553         # and a third lease, using add-lease
554         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
555                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
556         ss.remote_add_lease("si1", rs2a, cs2a)
557         leases = list(ss.get_leases("si1"))
558         self.failUnlessEqual(len(leases), 3)
559         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
560
561         # add-lease on a missing storage index is silently ignored
562         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
563
564         # check that si0 is readable
565         readers = ss.remote_get_buckets("si0")
566         self.failUnlessEqual(len(readers), 5)
567
568         # renew the first lease. Only the proper renew_secret should work
569         ss.remote_renew_lease("si0", rs0)
570         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
571         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
572
573         # check that si0 is still readable
574         readers = ss.remote_get_buckets("si0")
575         self.failUnlessEqual(len(readers), 5)
576
577         # now cancel it
578         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
579         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
580         ss.remote_cancel_lease("si0", cs0)
581
582         # si0 should now be gone
583         readers = ss.remote_get_buckets("si0")
584         self.failUnlessEqual(len(readers), 0)
585         # and the renew should no longer work
586         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
587
588
589         # cancel the first lease on si1, leaving the second and third in place
590         ss.remote_cancel_lease("si1", cs1)
591         readers = ss.remote_get_buckets("si1")
592         self.failUnlessEqual(len(readers), 5)
593         # the corresponding renew should no longer work
594         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
595
596         leases = list(ss.get_leases("si1"))
597         self.failUnlessEqual(len(leases), 2)
598         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
599
600         ss.remote_renew_lease("si1", rs2)
601         # cancelling the second and third should make it go away
602         ss.remote_cancel_lease("si1", cs2)
603         ss.remote_cancel_lease("si1", cs2a)
604         readers = ss.remote_get_buckets("si1")
605         self.failUnlessEqual(len(readers), 0)
606         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
607         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
608         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
609
610         leases = list(ss.get_leases("si1"))
611         self.failUnlessEqual(len(leases), 0)
612
613
614         # test overlapping uploads
615         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
616                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
617         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
618                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
619         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
620                                                      sharenums, size, canary)
621         self.failUnlessEqual(len(already), 0)
622         self.failUnlessEqual(len(writers), 5)
623         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
624                                                        sharenums, size, canary)
625         self.failUnlessEqual(len(already2), 0)
626         self.failUnlessEqual(len(writers2), 0)
627         for wb in writers.values():
628             wb.remote_close()
629
630         leases = list(ss.get_leases("si3"))
631         self.failUnlessEqual(len(leases), 1)
632
633         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
634                                                        sharenums, size, canary)
635         self.failUnlessEqual(len(already3), 5)
636         self.failUnlessEqual(len(writers3), 0)
637
638         leases = list(ss.get_leases("si3"))
639         self.failUnlessEqual(len(leases), 2)
640
641     def test_readonly(self):
642         workdir = self.workdir("test_readonly")
643         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
644         ss.setServiceParent(self.sparent)
645
646         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
647         self.failUnlessEqual(already, set())
648         self.failUnlessEqual(writers, {})
649
650         stats = ss.get_stats()
651         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
652         if "storage_server.disk_avail" in stats:
653             # Some platforms may not have an API to get disk stats.
654             # But if there are stats, readonly_storage means disk_avail=0
655             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
656
657     def test_discard(self):
658         # discard is really only used for other tests, but we test it anyways
659         workdir = self.workdir("test_discard")
660         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
661         ss.setServiceParent(self.sparent)
662
663         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
664         self.failUnlessEqual(already, set())
665         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
666         for i,wb in writers.items():
667             wb.remote_write(0, "%25d" % i)
668             wb.remote_close()
669         # since we discard the data, the shares should be present but sparse.
670         # Since we write with some seeks, the data we read back will be all
671         # zeros.
672         b = ss.remote_get_buckets("vid")
673         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
674         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
675
676     def test_advise_corruption(self):
677         workdir = self.workdir("test_advise_corruption")
678         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
679         ss.setServiceParent(self.sparent)
680
681         si0_s = base32.b2a("si0")
682         ss.remote_advise_corrupt_share("immutable", "si0", 0,
683                                        "This share smells funny.\n")
684         reportdir = os.path.join(workdir, "corruption-advisories")
685         reports = os.listdir(reportdir)
686         self.failUnlessEqual(len(reports), 1)
687         report_si0 = reports[0]
688         self.failUnlessIn(si0_s, report_si0)
689         f = open(os.path.join(reportdir, report_si0), "r")
690         report = f.read()
691         f.close()
692         self.failUnlessIn("type: immutable", report)
693         self.failUnlessIn("storage_index: %s" % si0_s, report)
694         self.failUnlessIn("share_number: 0", report)
695         self.failUnlessIn("This share smells funny.", report)
696
697         # test the RIBucketWriter version too
698         si1_s = base32.b2a("si1")
699         already,writers = self.allocate(ss, "si1", [1], 75)
700         self.failUnlessEqual(already, set())
701         self.failUnlessEqual(set(writers.keys()), set([1]))
702         writers[1].remote_write(0, "data")
703         writers[1].remote_close()
704
705         b = ss.remote_get_buckets("si1")
706         self.failUnlessEqual(set(b.keys()), set([1]))
707         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
708
709         reports = os.listdir(reportdir)
710         self.failUnlessEqual(len(reports), 2)
711         report_si1 = [r for r in reports if si1_s in r][0]
712         f = open(os.path.join(reportdir, report_si1), "r")
713         report = f.read()
714         f.close()
715         self.failUnlessIn("type: immutable", report)
716         self.failUnlessIn("storage_index: %s" % si1_s, report)
717         self.failUnlessIn("share_number: 1", report)
718         self.failUnlessIn("This share tastes like dust.", report)
719
720
721
722 class MutableServer(unittest.TestCase):
723
724     def setUp(self):
725         self.sparent = LoggingServiceParent()
726         self._lease_secret = itertools.count()
727     def tearDown(self):
728         return self.sparent.stopService()
729
730     def workdir(self, name):
731         basedir = os.path.join("storage", "MutableServer", name)
732         return basedir
733
734     def create(self, name):
735         workdir = self.workdir(name)
736         ss = StorageServer(workdir, "\x00" * 20)
737         ss.setServiceParent(self.sparent)
738         return ss
739
740     def test_create(self):
741         self.create("test_create")
742
743     def write_enabler(self, we_tag):
744         return hashutil.tagged_hash("we_blah", we_tag)
745
746     def renew_secret(self, tag):
747         return hashutil.tagged_hash("renew_blah", str(tag))
748
749     def cancel_secret(self, tag):
750         return hashutil.tagged_hash("cancel_blah", str(tag))
751
752     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
753         write_enabler = self.write_enabler(we_tag)
754         renew_secret = self.renew_secret(lease_tag)
755         cancel_secret = self.cancel_secret(lease_tag)
756         rstaraw = ss.remote_slot_testv_and_readv_and_writev
757         testandwritev = dict( [ (shnum, ([], [], None) )
758                          for shnum in sharenums ] )
759         readv = []
760         rc = rstaraw(storage_index,
761                      (write_enabler, renew_secret, cancel_secret),
762                      testandwritev,
763                      readv)
764         (did_write, readv_data) = rc
765         self.failUnless(did_write)
766         self.failUnless(isinstance(readv_data, dict))
767         self.failUnlessEqual(len(readv_data), 0)
768
769     def test_bad_magic(self):
770         ss = self.create("test_bad_magic")
771         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
772         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
773         f = open(fn, "rb+")
774         f.seek(0)
775         f.write("BAD MAGIC")
776         f.close()
777         read = ss.remote_slot_readv
778         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
779                                   read, "si1", [0], [(0,10)])
780         self.failUnlessIn(" had magic ", str(e))
781         self.failUnlessIn(" but we wanted ", str(e))
782
783     def test_container_size(self):
784         ss = self.create("test_container_size")
785         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
786                       set([0,1,2]), 100)
787         read = ss.remote_slot_readv
788         rstaraw = ss.remote_slot_testv_and_readv_and_writev
789         secrets = ( self.write_enabler("we1"),
790                     self.renew_secret("we1"),
791                     self.cancel_secret("we1") )
792         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
793         answer = rstaraw("si1", secrets,
794                          {0: ([], [(0,data)], len(data)+12)},
795                          [])
796         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
797
798         # trying to make the container too large will raise an exception
799         TOOBIG = MutableShareFile.MAX_SIZE + 10
800         self.failUnlessRaises(DataTooLargeError,
801                               rstaraw, "si1", secrets,
802                               {0: ([], [(0,data)], TOOBIG)},
803                               [])
804
805         # it should be possible to make the container smaller, although at
806         # the moment this doesn't actually affect the share, unless the
807         # container size is dropped to zero, in which case the share is
808         # deleted.
809         answer = rstaraw("si1", secrets,
810                          {0: ([], [(0,data)], len(data)+8)},
811                          [])
812         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
813
814         answer = rstaraw("si1", secrets,
815                          {0: ([], [(0,data)], 0)},
816                          [])
817         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
818
819         read_answer = read("si1", [0], [(0,10)])
820         self.failUnlessEqual(read_answer, {})
821
822     def test_allocate(self):
823         ss = self.create("test_allocate")
824         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
825                       set([0,1,2]), 100)
826
827         read = ss.remote_slot_readv
828         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
829                              {0: [""]})
830         self.failUnlessEqual(read("si1", [], [(0, 10)]),
831                              {0: [""], 1: [""], 2: [""]})
832         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
833                              {0: [""]})
834
835         # try writing to one
836         secrets = ( self.write_enabler("we1"),
837                     self.renew_secret("we1"),
838                     self.cancel_secret("we1") )
839         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
840         write = ss.remote_slot_testv_and_readv_and_writev
841         answer = write("si1", secrets,
842                        {0: ([], [(0,data)], None)},
843                        [])
844         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
845
846         self.failUnlessEqual(read("si1", [0], [(0,20)]),
847                              {0: ["00000000001111111111"]})
848         self.failUnlessEqual(read("si1", [0], [(95,10)]),
849                              {0: ["99999"]})
850         #self.failUnlessEqual(s0.remote_get_length(), 100)
851
852         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
853         f = self.failUnlessRaises(BadWriteEnablerError,
854                                   write, "si1", bad_secrets,
855                                   {}, [])
856         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
857
858         # this testv should fail
859         answer = write("si1", secrets,
860                        {0: ([(0, 12, "eq", "444444444444"),
861                              (20, 5, "eq", "22222"),
862                              ],
863                             [(0, "x"*100)],
864                             None),
865                         },
866                        [(0,12), (20,5)],
867                        )
868         self.failUnlessEqual(answer, (False,
869                                       {0: ["000000000011", "22222"],
870                                        1: ["", ""],
871                                        2: ["", ""],
872                                        }))
873         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
874
875         # as should this one
876         answer = write("si1", secrets,
877                        {0: ([(10, 5, "lt", "11111"),
878                              ],
879                             [(0, "x"*100)],
880                             None),
881                         },
882                        [(10,5)],
883                        )
884         self.failUnlessEqual(answer, (False,
885                                       {0: ["11111"],
886                                        1: [""],
887                                        2: [""]},
888                                       ))
889         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
890
891
892     def test_operators(self):
893         # test operators, the data we're comparing is '11111' in all cases.
894         # test both fail+pass, reset data after each one.
895         ss = self.create("test_operators")
896
897         secrets = ( self.write_enabler("we1"),
898                     self.renew_secret("we1"),
899                     self.cancel_secret("we1") )
900         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
901         write = ss.remote_slot_testv_and_readv_and_writev
902         read = ss.remote_slot_readv
903
904         def reset():
905             write("si1", secrets,
906                   {0: ([], [(0,data)], None)},
907                   [])
908
909         reset()
910
911         #  lt
912         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
913                                              ],
914                                             [(0, "x"*100)],
915                                             None,
916                                             )}, [(10,5)])
917         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
918         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
919         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
920         reset()
921
922         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
923                                              ],
924                                             [(0, "x"*100)],
925                                             None,
926                                             )}, [(10,5)])
927         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
928         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
929         reset()
930
931         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
932                                              ],
933                                             [(0, "y"*100)],
934                                             None,
935                                             )}, [(10,5)])
936         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
937         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
938         reset()
939
940         #  le
941         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
942                                              ],
943                                             [(0, "x"*100)],
944                                             None,
945                                             )}, [(10,5)])
946         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
947         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
948         reset()
949
950         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
951                                              ],
952                                             [(0, "y"*100)],
953                                             None,
954                                             )}, [(10,5)])
955         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
956         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
957         reset()
958
959         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
960                                              ],
961                                             [(0, "y"*100)],
962                                             None,
963                                             )}, [(10,5)])
964         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
965         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
966         reset()
967
968         #  eq
969         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
970                                              ],
971                                             [(0, "x"*100)],
972                                             None,
973                                             )}, [(10,5)])
974         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
976         reset()
977
978         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
979                                              ],
980                                             [(0, "y"*100)],
981                                             None,
982                                             )}, [(10,5)])
983         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
984         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
985         reset()
986
987         #  ne
988         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
989                                              ],
990                                             [(0, "x"*100)],
991                                             None,
992                                             )}, [(10,5)])
993         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
994         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
995         reset()
996
997         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
998                                              ],
999                                             [(0, "y"*100)],
1000                                             None,
1001                                             )}, [(10,5)])
1002         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1003         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1004         reset()
1005
1006         #  ge
1007         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1008                                              ],
1009                                             [(0, "y"*100)],
1010                                             None,
1011                                             )}, [(10,5)])
1012         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1013         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1014         reset()
1015
1016         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1017                                              ],
1018                                             [(0, "y"*100)],
1019                                             None,
1020                                             )}, [(10,5)])
1021         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1022         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1023         reset()
1024
1025         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1026                                              ],
1027                                             [(0, "y"*100)],
1028                                             None,
1029                                             )}, [(10,5)])
1030         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1031         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1032         reset()
1033
1034         #  gt
1035         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1036                                              ],
1037                                             [(0, "y"*100)],
1038                                             None,
1039                                             )}, [(10,5)])
1040         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1041         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1042         reset()
1043
1044         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1045                                              ],
1046                                             [(0, "x"*100)],
1047                                             None,
1048                                             )}, [(10,5)])
1049         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1050         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1051         reset()
1052
1053         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1054                                              ],
1055                                             [(0, "x"*100)],
1056                                             None,
1057                                             )}, [(10,5)])
1058         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1059         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1060         reset()
1061
1062         # finally, test some operators against empty shares
1063         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1064                                              ],
1065                                             [(0, "x"*100)],
1066                                             None,
1067                                             )}, [(10,5)])
1068         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1069         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1070         reset()
1071
1072     def test_readv(self):
1073         ss = self.create("test_readv")
1074         secrets = ( self.write_enabler("we1"),
1075                     self.renew_secret("we1"),
1076                     self.cancel_secret("we1") )
1077         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1078         write = ss.remote_slot_testv_and_readv_and_writev
1079         read = ss.remote_slot_readv
1080         data = [("%d" % i) * 100 for i in range(3)]
1081         rc = write("si1", secrets,
1082                    {0: ([], [(0,data[0])], None),
1083                     1: ([], [(0,data[1])], None),
1084                     2: ([], [(0,data[2])], None),
1085                     }, [])
1086         self.failUnlessEqual(rc, (True, {}))
1087
1088         answer = read("si1", [], [(0, 10)])
1089         self.failUnlessEqual(answer, {0: ["0"*10],
1090                                       1: ["1"*10],
1091                                       2: ["2"*10]})
1092
1093     def compare_leases_without_timestamps(self, leases_a, leases_b):
1094         self.failUnlessEqual(len(leases_a), len(leases_b))
1095         for i in range(len(leases_a)):
1096             a = leases_a[i]
1097             b = leases_b[i]
1098             self.failUnlessEqual(a.owner_num,       b.owner_num)
1099             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1100             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1101             self.failUnlessEqual(a.nodeid,          b.nodeid)
1102
1103     def compare_leases(self, leases_a, leases_b):
1104         self.failUnlessEqual(len(leases_a), len(leases_b))
1105         for i in range(len(leases_a)):
1106             a = leases_a[i]
1107             b = leases_b[i]
1108             self.failUnlessEqual(a.owner_num,       b.owner_num)
1109             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1110             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1111             self.failUnlessEqual(a.nodeid,          b.nodeid)
1112             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1113
1114     def test_leases(self):
1115         ss = self.create("test_leases")
1116         def secrets(n):
1117             return ( self.write_enabler("we1"),
1118                      self.renew_secret("we1-%d" % n),
1119                      self.cancel_secret("we1-%d" % n) )
1120         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1121         write = ss.remote_slot_testv_and_readv_and_writev
1122         read = ss.remote_slot_readv
1123         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1124         self.failUnlessEqual(rc, (True, {}))
1125
1126         # create a random non-numeric file in the bucket directory, to
1127         # exercise the code that's supposed to ignore those.
1128         bucket_dir = os.path.join(self.workdir("test_leases"),
1129                                   "shares", storage_index_to_dir("si1"))
1130         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1131         f.write("you ought to be ignoring me\n")
1132         f.close()
1133
1134         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1135         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1136
1137         # add-lease on a missing storage index is silently ignored
1138         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1139
1140         # re-allocate the slots and use the same secrets, that should update
1141         # the lease
1142         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1143         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1144
1145         # renew it directly
1146         ss.remote_renew_lease("si1", secrets(0)[1])
1147         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1148
1149         # now allocate them with a bunch of different secrets, to trigger the
1150         # extended lease code. Use add_lease for one of them.
1151         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1152         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1153         secrets2 = secrets(2)
1154         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1155         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1156         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1157         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1158         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1159
1160         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1161
1162         # cancel one of them
1163         ss.remote_cancel_lease("si1", secrets(5)[2])
1164         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1165
1166         all_leases = list(s0.get_leases())
1167         # and write enough data to expand the container, forcing the server
1168         # to move the leases
1169         write("si1", secrets(0),
1170               {0: ([], [(0,data)], 200), },
1171               [])
1172
1173         # read back the leases, make sure they're still intact.
1174         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1175
1176         ss.remote_renew_lease("si1", secrets(0)[1])
1177         ss.remote_renew_lease("si1", secrets(1)[1])
1178         ss.remote_renew_lease("si1", secrets(2)[1])
1179         ss.remote_renew_lease("si1", secrets(3)[1])
1180         ss.remote_renew_lease("si1", secrets(4)[1])
1181         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1182         # get a new copy of the leases, with the current timestamps. Reading
1183         # data and failing to renew/cancel leases should leave the timestamps
1184         # alone.
1185         all_leases = list(s0.get_leases())
1186         # renewing with a bogus token should prompt an error message
1187
1188         # examine the exception thus raised, make sure the old nodeid is
1189         # present, to provide for share migration
1190         e = self.failUnlessRaises(IndexError,
1191                                   ss.remote_renew_lease, "si1",
1192                                   secrets(20)[1])
1193         e_s = str(e)
1194         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1195         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1196         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1197
1198         # same for cancelling
1199         self.failUnlessRaises(IndexError,
1200                               ss.remote_cancel_lease, "si1",
1201                               secrets(20)[2])
1202         self.compare_leases(all_leases, list(s0.get_leases()))
1203
1204         # reading shares should not modify the timestamp
1205         read("si1", [], [(0,200)])
1206         self.compare_leases(all_leases, list(s0.get_leases()))
1207
1208         write("si1", secrets(0),
1209               {0: ([], [(200, "make me bigger")], None)}, [])
1210         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1211
1212         write("si1", secrets(0),
1213               {0: ([], [(500, "make me really bigger")], None)}, [])
1214         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1215
1216         # now cancel them all
1217         ss.remote_cancel_lease("si1", secrets(0)[2])
1218         ss.remote_cancel_lease("si1", secrets(1)[2])
1219         ss.remote_cancel_lease("si1", secrets(2)[2])
1220         ss.remote_cancel_lease("si1", secrets(3)[2])
1221
1222         # the slot should still be there
1223         remaining_shares = read("si1", [], [(0,10)])
1224         self.failUnlessEqual(len(remaining_shares), 1)
1225         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1226
1227         # cancelling a non-existent lease should raise an IndexError
1228         self.failUnlessRaises(IndexError,
1229                               ss.remote_cancel_lease, "si1", "nonsecret")
1230
1231         # and the slot should still be there
1232         remaining_shares = read("si1", [], [(0,10)])
1233         self.failUnlessEqual(len(remaining_shares), 1)
1234         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1235
1236         ss.remote_cancel_lease("si1", secrets(4)[2])
1237         # now the slot should be gone
1238         no_shares = read("si1", [], [(0,10)])
1239         self.failUnlessEqual(no_shares, {})
1240
1241         # cancelling a lease on a non-existent share should raise an IndexError
1242         self.failUnlessRaises(IndexError,
1243                               ss.remote_cancel_lease, "si2", "nonsecret")
1244
1245     def test_remove(self):
1246         ss = self.create("test_remove")
1247         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1248                       set([0,1,2]), 100)
1249         readv = ss.remote_slot_readv
1250         writev = ss.remote_slot_testv_and_readv_and_writev
1251         secrets = ( self.write_enabler("we1"),
1252                     self.renew_secret("we1"),
1253                     self.cancel_secret("we1") )
1254         # delete sh0 by setting its size to zero
1255         answer = writev("si1", secrets,
1256                         {0: ([], [], 0)},
1257                         [])
1258         # the answer should mention all the shares that existed before the
1259         # write
1260         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1261         # but a new read should show only sh1 and sh2
1262         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1263                              {1: [""], 2: [""]})
1264
1265         # delete sh1 by setting its size to zero
1266         answer = writev("si1", secrets,
1267                         {1: ([], [], 0)},
1268                         [])
1269         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1270         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1271                              {2: [""]})
1272
1273         # delete sh2 by setting its size to zero
1274         answer = writev("si1", secrets,
1275                         {2: ([], [], 0)},
1276                         [])
1277         self.failUnlessEqual(answer, (True, {2:[]}) )
1278         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1279                              {})
1280         # and the bucket directory should now be gone
1281         si = base32.b2a("si1")
1282         # note: this is a detail of the storage server implementation, and
1283         # may change in the future
1284         prefix = si[:2]
1285         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1286         bucketdir = os.path.join(prefixdir, si)
1287         self.failUnless(os.path.exists(prefixdir), prefixdir)
1288         self.failIf(os.path.exists(bucketdir), bucketdir)
1289
1290 class Stats(unittest.TestCase):
1291
1292     def setUp(self):
1293         self.sparent = LoggingServiceParent()
1294         self._lease_secret = itertools.count()
1295     def tearDown(self):
1296         return self.sparent.stopService()
1297
1298     def workdir(self, name):
1299         basedir = os.path.join("storage", "Server", name)
1300         return basedir
1301
1302     def create(self, name):
1303         workdir = self.workdir(name)
1304         ss = StorageServer(workdir, "\x00" * 20)
1305         ss.setServiceParent(self.sparent)
1306         return ss
1307
1308     def test_latencies(self):
1309         ss = self.create("test_latencies")
1310         for i in range(10000):
1311             ss.add_latency("allocate", 1.0 * i)
1312         for i in range(1000):
1313             ss.add_latency("renew", 1.0 * i)
1314         for i in range(10):
1315             ss.add_latency("cancel", 2.0 * i)
1316         ss.add_latency("get", 5.0)
1317
1318         output = ss.get_latencies()
1319
1320         self.failUnlessEqual(sorted(output.keys()),
1321                              sorted(["allocate", "renew", "cancel", "get"]))
1322         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1323         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1324         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1325         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1326         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1327         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1328         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1329         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1330         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1331
1332         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1333         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1334         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
1335         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1336         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1337         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1338         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1339         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1340         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1341
1342         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1343         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1344         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1, output)
1345         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
1346         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1347         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1348         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1349         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1350         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1351
1352         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1353         self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1354         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1355         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1356         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1357         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1358         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1359         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1360         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1361
1362 def remove_tags(s):
1363     s = re.sub(r'<[^>]*>', ' ', s)
1364     s = re.sub(r'\s+', ' ', s)
1365     return s
1366
1367 class MyBucketCountingCrawler(BucketCountingCrawler):
1368     def finished_prefix(self, cycle, prefix):
1369         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1370         if self.hook_ds:
1371             d = self.hook_ds.pop(0)
1372             d.callback(None)
1373
1374 class MyStorageServer(StorageServer):
1375     def add_bucket_counter(self):
1376         statefile = os.path.join(self.storedir, "bucket_counter.state")
1377         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1378         self.bucket_counter.setServiceParent(self)
1379
1380 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1381
1382     def setUp(self):
1383         self.s = service.MultiService()
1384         self.s.startService()
1385     def tearDown(self):
1386         return self.s.stopService()
1387
1388     def test_bucket_counter(self):
1389         basedir = "storage/BucketCounter/bucket_counter"
1390         fileutil.make_dirs(basedir)
1391         ss = StorageServer(basedir, "\x00" * 20)
1392         # to make sure we capture the bucket-counting-crawler in the middle
1393         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1394         # also make it start sooner than usual.
1395         ss.bucket_counter.slow_start = 0
1396         orig_cpu_slice = ss.bucket_counter.cpu_slice
1397         ss.bucket_counter.cpu_slice = 0
1398         ss.setServiceParent(self.s)
1399
1400         w = StorageStatus(ss)
1401
1402         # this sample is before the crawler has started doing anything
1403         html = w.renderSynchronously()
1404         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1405         s = remove_tags(html)
1406         self.failUnlessIn("Accepting new shares: Yes", s)
1407         self.failUnlessIn("Reserved space: - 0 B (0)", s)
1408         self.failUnlessIn("Total buckets: Not computed yet", s)
1409         self.failUnlessIn("Next crawl in", s)
1410
1411         # give the bucket-counting-crawler one tick to get started. The
1412         # cpu_slice=0 will force it to yield right after it processes the
1413         # first prefix
1414
1415         d = fireEventually()
1416         def _check(ignored):
1417             # are we really right after the first prefix?
1418             state = ss.bucket_counter.get_state()
1419             if state["last-complete-prefix"] is None:
1420                 d2 = fireEventually()
1421                 d2.addCallback(_check)
1422                 return d2
1423             self.failUnlessEqual(state["last-complete-prefix"],
1424                                  ss.bucket_counter.prefixes[0])
1425             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1426             html = w.renderSynchronously()
1427             s = remove_tags(html)
1428             self.failUnlessIn(" Current crawl ", s)
1429             self.failUnlessIn(" (next work in ", s)
1430         d.addCallback(_check)
1431
1432         # now give it enough time to complete a full cycle
1433         def _watch():
1434             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1435         d.addCallback(lambda ignored: self.poll(_watch))
1436         def _check2(ignored):
1437             ss.bucket_counter.cpu_slice = orig_cpu_slice
1438             html = w.renderSynchronously()
1439             s = remove_tags(html)
1440             self.failUnlessIn("Total buckets: 0 (the number of", s)
1441             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1442         d.addCallback(_check2)
1443         return d
1444
1445     def test_bucket_counter_cleanup(self):
1446         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1447         fileutil.make_dirs(basedir)
1448         ss = StorageServer(basedir, "\x00" * 20)
1449         # to make sure we capture the bucket-counting-crawler in the middle
1450         # of a cycle, we reach in and reduce its maximum slice time to 0.
1451         ss.bucket_counter.slow_start = 0
1452         orig_cpu_slice = ss.bucket_counter.cpu_slice
1453         ss.bucket_counter.cpu_slice = 0
1454         ss.setServiceParent(self.s)
1455
1456         d = fireEventually()
1457
1458         def _after_first_prefix(ignored):
1459             state = ss.bucket_counter.state
1460             if state["last-complete-prefix"] is None:
1461                 d2 = fireEventually()
1462                 d2.addCallback(_after_first_prefix)
1463                 return d2
1464             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1465             # now sneak in and mess with its state, to make sure it cleans up
1466             # properly at the end of the cycle
1467             self.failUnlessEqual(state["last-complete-prefix"],
1468                                  ss.bucket_counter.prefixes[0])
1469             state["bucket-counts"][-12] = {}
1470             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1471             ss.bucket_counter.save_state()
1472         d.addCallback(_after_first_prefix)
1473
1474         # now give it enough time to complete a cycle
1475         def _watch():
1476             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1477         d.addCallback(lambda ignored: self.poll(_watch))
1478         def _check2(ignored):
1479             ss.bucket_counter.cpu_slice = orig_cpu_slice
1480             s = ss.bucket_counter.get_state()
1481             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1482             self.failIf("bogusprefix!" in s["storage-index-samples"],
1483                         s["storage-index-samples"].keys())
1484         d.addCallback(_check2)
1485         return d
1486
1487     def test_bucket_counter_eta(self):
1488         basedir = "storage/BucketCounter/bucket_counter_eta"
1489         fileutil.make_dirs(basedir)
1490         ss = MyStorageServer(basedir, "\x00" * 20)
1491         ss.bucket_counter.slow_start = 0
1492         # these will be fired inside finished_prefix()
1493         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1494         w = StorageStatus(ss)
1495
1496         d = defer.Deferred()
1497
1498         def _check_1(ignored):
1499             # no ETA is available yet
1500             html = w.renderSynchronously()
1501             s = remove_tags(html)
1502             self.failUnlessIn("complete (next work", s)
1503
1504         def _check_2(ignored):
1505             # one prefix has finished, so an ETA based upon that elapsed time
1506             # should be available.
1507             html = w.renderSynchronously()
1508             s = remove_tags(html)
1509             self.failUnlessIn("complete (ETA ", s)
1510
1511         def _check_3(ignored):
1512             # two prefixes have finished
1513             html = w.renderSynchronously()
1514             s = remove_tags(html)
1515             self.failUnlessIn("complete (ETA ", s)
1516             d.callback("done")
1517
1518         hooks[0].addCallback(_check_1).addErrback(d.errback)
1519         hooks[1].addCallback(_check_2).addErrback(d.errback)
1520         hooks[2].addCallback(_check_3).addErrback(d.errback)
1521
1522         ss.setServiceParent(self.s)
1523         return d
1524
1525 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1526     stop_after_first_bucket = False
1527     def process_bucket(self, *args, **kwargs):
1528         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1529         if self.stop_after_first_bucket:
1530             self.stop_after_first_bucket = False
1531             self.cpu_slice = -1.0
1532     def yielding(self, sleep_time):
1533         if not self.stop_after_first_bucket:
1534             self.cpu_slice = 500
1535
1536 class BrokenStatResults:
1537     pass
1538 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1539     def stat(self, fn):
1540         s = os.stat(fn)
1541         bsr = BrokenStatResults()
1542         for attrname in dir(s):
1543             if attrname.startswith("_"):
1544                 continue
1545             if attrname == "st_blocks":
1546                 continue
1547             setattr(bsr, attrname, getattr(s, attrname))
1548         return bsr
1549
1550 class InstrumentedStorageServer(StorageServer):
1551     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1552 class No_ST_BLOCKS_StorageServer(StorageServer):
1553     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1554
1555 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1556
1557     def setUp(self):
1558         self.s = service.MultiService()
1559         self.s.startService()
1560     def tearDown(self):
1561         return self.s.stopService()
1562
1563     def make_shares(self, ss):
1564         def make(si):
1565             return (si, hashutil.tagged_hash("renew", si),
1566                     hashutil.tagged_hash("cancel", si))
1567         def make_mutable(si):
1568             return (si, hashutil.tagged_hash("renew", si),
1569                     hashutil.tagged_hash("cancel", si),
1570                     hashutil.tagged_hash("write-enabler", si))
1571         def make_extra_lease(si, num):
1572             return (hashutil.tagged_hash("renew-%d" % num, si),
1573                     hashutil.tagged_hash("cancel-%d" % num, si))
1574
1575         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1576         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1577         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1578         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1579         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1580         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1581         sharenums = [0]
1582         canary = FakeCanary()
1583         # note: 'tahoe debug dump-share' will not handle this file, since the
1584         # inner contents are not a valid CHK share
1585         data = "\xff" * 1000
1586
1587         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1588                                          1000, canary)
1589         w[0].remote_write(0, data)
1590         w[0].remote_close()
1591
1592         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1593                                          1000, canary)
1594         w[0].remote_write(0, data)
1595         w[0].remote_close()
1596         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1597
1598         writev = ss.remote_slot_testv_and_readv_and_writev
1599         writev(mutable_si_2, (we2, rs2, cs2),
1600                {0: ([], [(0,data)], len(data))}, [])
1601         writev(mutable_si_3, (we3, rs3, cs3),
1602                {0: ([], [(0,data)], len(data))}, [])
1603         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1604
1605         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1606         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1607         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1608
1609     def test_basic(self):
1610         basedir = "storage/LeaseCrawler/basic"
1611         fileutil.make_dirs(basedir)
1612         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1613         # make it start sooner than usual.
1614         lc = ss.lease_checker
1615         lc.slow_start = 0
1616         lc.cpu_slice = 500
1617         lc.stop_after_first_bucket = True
1618         webstatus = StorageStatus(ss)
1619
1620         # create a few shares, with some leases on them
1621         self.make_shares(ss)
1622         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1623
1624         # add a non-sharefile to exercise another code path
1625         fn = os.path.join(ss.sharedir,
1626                           storage_index_to_dir(immutable_si_0),
1627                           "not-a-share")
1628         f = open(fn, "wb")
1629         f.write("I am not a share.\n")
1630         f.close()
1631
1632         # this is before the crawl has started, so we're not in a cycle yet
1633         initial_state = lc.get_state()
1634         self.failIf(lc.get_progress()["cycle-in-progress"])
1635         self.failIfIn("cycle-to-date", initial_state)
1636         self.failIfIn("estimated-remaining-cycle", initial_state)
1637         self.failIfIn("estimated-current-cycle", initial_state)
1638         self.failUnlessIn("history", initial_state)
1639         self.failUnlessEqual(initial_state["history"], {})
1640
1641         ss.setServiceParent(self.s)
1642
1643         DAY = 24*60*60
1644
1645         d = fireEventually()
1646
1647         # now examine the state right after the first bucket has been
1648         # processed.
1649         def _after_first_bucket(ignored):
1650             initial_state = lc.get_state()
1651             if "cycle-to-date" not in initial_state:
1652                 d2 = fireEventually()
1653                 d2.addCallback(_after_first_bucket)
1654                 return d2
1655             self.failUnlessIn("cycle-to-date", initial_state)
1656             self.failUnlessIn("estimated-remaining-cycle", initial_state)
1657             self.failUnlessIn("estimated-current-cycle", initial_state)
1658             self.failUnlessIn("history", initial_state)
1659             self.failUnlessEqual(initial_state["history"], {})
1660
1661             so_far = initial_state["cycle-to-date"]
1662             self.failUnlessEqual(so_far["expiration-enabled"], False)
1663             self.failUnlessIn("configured-expiration-mode", so_far)
1664             self.failUnlessIn("lease-age-histogram", so_far)
1665             lah = so_far["lease-age-histogram"]
1666             self.failUnlessEqual(type(lah), list)
1667             self.failUnlessEqual(len(lah), 1)
1668             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1669             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1670             self.failUnlessEqual(so_far["corrupt-shares"], [])
1671             sr1 = so_far["space-recovered"]
1672             self.failUnlessEqual(sr1["examined-buckets"], 1)
1673             self.failUnlessEqual(sr1["examined-shares"], 1)
1674             self.failUnlessEqual(sr1["actual-shares"], 0)
1675             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1676             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1677             left = initial_state["estimated-remaining-cycle"]
1678             sr2 = left["space-recovered"]
1679             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1680             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1681             self.failIfEqual(sr2["actual-shares"], None)
1682             self.failIfEqual(sr2["configured-diskbytes"], None)
1683             self.failIfEqual(sr2["original-sharebytes"], None)
1684         d.addCallback(_after_first_bucket)
1685         d.addCallback(lambda ign: self.render1(webstatus))
1686         def _check_html_in_cycle(html):
1687             s = remove_tags(html)
1688             self.failUnlessIn("So far, this cycle has examined "
1689                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1690             self.failUnlessIn("and has recovered: "
1691                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1692                               "0 B (0 B / 0 B)", s)
1693             self.failUnlessIn("If expiration were enabled, "
1694                               "we would have recovered: "
1695                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1696                               " 0 B (0 B / 0 B) by now", s)
1697             self.failUnlessIn("and the remainder of this cycle "
1698                               "would probably recover: "
1699                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1700                               " 0 B (0 B / 0 B)", s)
1701             self.failUnlessIn("and the whole cycle would probably recover: "
1702                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1703                               " 0 B (0 B / 0 B)", s)
1704             self.failUnlessIn("if we were strictly using each lease's default "
1705                               "31-day lease lifetime", s)
1706             self.failUnlessIn("this cycle would be expected to recover: ", s)
1707         d.addCallback(_check_html_in_cycle)
1708
1709         # wait for the crawler to finish the first cycle. Nothing should have
1710         # been removed.
1711         def _wait():
1712             return bool(lc.get_state()["last-cycle-finished"] is not None)
1713         d.addCallback(lambda ign: self.poll(_wait))
1714
1715         def _after_first_cycle(ignored):
1716             s = lc.get_state()
1717             self.failIf("cycle-to-date" in s)
1718             self.failIf("estimated-remaining-cycle" in s)
1719             self.failIf("estimated-current-cycle" in s)
1720             last = s["history"][0]
1721             self.failUnlessIn("cycle-start-finish-times", last)
1722             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1723             self.failUnlessEqual(last["expiration-enabled"], False)
1724             self.failUnlessIn("configured-expiration-mode", last)
1725
1726             self.failUnlessIn("lease-age-histogram", last)
1727             lah = last["lease-age-histogram"]
1728             self.failUnlessEqual(type(lah), list)
1729             self.failUnlessEqual(len(lah), 1)
1730             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1731
1732             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1733             self.failUnlessEqual(last["corrupt-shares"], [])
1734
1735             rec = last["space-recovered"]
1736             self.failUnlessEqual(rec["examined-buckets"], 4)
1737             self.failUnlessEqual(rec["examined-shares"], 4)
1738             self.failUnlessEqual(rec["actual-buckets"], 0)
1739             self.failUnlessEqual(rec["original-buckets"], 0)
1740             self.failUnlessEqual(rec["configured-buckets"], 0)
1741             self.failUnlessEqual(rec["actual-shares"], 0)
1742             self.failUnlessEqual(rec["original-shares"], 0)
1743             self.failUnlessEqual(rec["configured-shares"], 0)
1744             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1745             self.failUnlessEqual(rec["original-diskbytes"], 0)
1746             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1747             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1748             self.failUnlessEqual(rec["original-sharebytes"], 0)
1749             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1750
1751             def _get_sharefile(si):
1752                 return list(ss._iter_share_files(si))[0]
1753             def count_leases(si):
1754                 return len(list(_get_sharefile(si).get_leases()))
1755             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1756             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1757             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1758             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1759         d.addCallback(_after_first_cycle)
1760         d.addCallback(lambda ign: self.render1(webstatus))
1761         def _check_html(html):
1762             s = remove_tags(html)
1763             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1764                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1765             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1766                               "(2 mutable / 2 immutable),", s)
1767             self.failUnlessIn("but expiration was not enabled", s)
1768         d.addCallback(_check_html)
1769         d.addCallback(lambda ign: self.render_json(webstatus))
1770         def _check_json(json):
1771             data = simplejson.loads(json)
1772             self.failUnlessIn("lease-checker", data)
1773             self.failUnlessIn("lease-checker-progress", data)
1774         d.addCallback(_check_json)
1775         return d
1776
1777     def backdate_lease(self, sf, renew_secret, new_expire_time):
1778         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1779         # "renew" a lease with a new_expire_time that is older than what the
1780         # current lease has), so we have to reach inside it.
1781         for i,lease in enumerate(sf.get_leases()):
1782             if lease.renew_secret == renew_secret:
1783                 lease.expiration_time = new_expire_time
1784                 f = open(sf.home, 'rb+')
1785                 sf._write_lease_record(f, i, lease)
1786                 f.close()
1787                 return
1788         raise IndexError("unable to renew non-existent lease")
1789
1790     def test_expire_age(self):
1791         basedir = "storage/LeaseCrawler/expire_age"
1792         fileutil.make_dirs(basedir)
1793         # setting expiration_time to 2000 means that any lease which is more
1794         # than 2000s old will be expired.
1795         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1796                                        expiration_enabled=True,
1797                                        expiration_mode="age",
1798                                        expiration_override_lease_duration=2000)
1799         # make it start sooner than usual.
1800         lc = ss.lease_checker
1801         lc.slow_start = 0
1802         lc.stop_after_first_bucket = True
1803         webstatus = StorageStatus(ss)
1804
1805         # create a few shares, with some leases on them
1806         self.make_shares(ss)
1807         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1808
1809         def count_shares(si):
1810             return len(list(ss._iter_share_files(si)))
1811         def _get_sharefile(si):
1812             return list(ss._iter_share_files(si))[0]
1813         def count_leases(si):
1814             return len(list(_get_sharefile(si).get_leases()))
1815
1816         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1817         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1818         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1819         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1820         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1821         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1822         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1823         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1824
1825         # artificially crank back the expiration time on the first lease of
1826         # each share, to make it look like it expired already (age=1000s).
1827         # Some shares have an extra lease which is set to expire at the
1828         # default time in 31 days from now (age=31days). We then run the
1829         # crawler, which will expire the first lease, making some shares get
1830         # deleted and others stay alive (with one remaining lease)
1831         now = time.time()
1832
1833         sf0 = _get_sharefile(immutable_si_0)
1834         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1835         sf0_size = os.stat(sf0.home).st_size
1836
1837         # immutable_si_1 gets an extra lease
1838         sf1 = _get_sharefile(immutable_si_1)
1839         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1840
1841         sf2 = _get_sharefile(mutable_si_2)
1842         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1843         sf2_size = os.stat(sf2.home).st_size
1844
1845         # mutable_si_3 gets an extra lease
1846         sf3 = _get_sharefile(mutable_si_3)
1847         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1848
1849         ss.setServiceParent(self.s)
1850
1851         d = fireEventually()
1852         # examine the state right after the first bucket has been processed
1853         def _after_first_bucket(ignored):
1854             p = lc.get_progress()
1855             if not p["cycle-in-progress"]:
1856                 d2 = fireEventually()
1857                 d2.addCallback(_after_first_bucket)
1858                 return d2
1859         d.addCallback(_after_first_bucket)
1860         d.addCallback(lambda ign: self.render1(webstatus))
1861         def _check_html_in_cycle(html):
1862             s = remove_tags(html)
1863             # the first bucket encountered gets deleted, and its prefix
1864             # happens to be about 1/5th of the way through the ring, so the
1865             # predictor thinks we'll have 5 shares and that we'll delete them
1866             # all. This part of the test depends upon the SIs landing right
1867             # where they do now.
1868             self.failUnlessIn("The remainder of this cycle is expected to "
1869                               "recover: 4 shares, 4 buckets", s)
1870             self.failUnlessIn("The whole cycle is expected to examine "
1871                               "5 shares in 5 buckets and to recover: "
1872                               "5 shares, 5 buckets", s)
1873         d.addCallback(_check_html_in_cycle)
1874
1875         # wait for the crawler to finish the first cycle. Two shares should
1876         # have been removed
1877         def _wait():
1878             return bool(lc.get_state()["last-cycle-finished"] is not None)
1879         d.addCallback(lambda ign: self.poll(_wait))
1880
1881         def _after_first_cycle(ignored):
1882             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1883             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1884             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1885             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1886             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1887             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1888
1889             s = lc.get_state()
1890             last = s["history"][0]
1891
1892             self.failUnlessEqual(last["expiration-enabled"], True)
1893             self.failUnlessEqual(last["configured-expiration-mode"],
1894                                  ("age", 2000, None, ("mutable", "immutable")))
1895             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1896
1897             rec = last["space-recovered"]
1898             self.failUnlessEqual(rec["examined-buckets"], 4)
1899             self.failUnlessEqual(rec["examined-shares"], 4)
1900             self.failUnlessEqual(rec["actual-buckets"], 2)
1901             self.failUnlessEqual(rec["original-buckets"], 2)
1902             self.failUnlessEqual(rec["configured-buckets"], 2)
1903             self.failUnlessEqual(rec["actual-shares"], 2)
1904             self.failUnlessEqual(rec["original-shares"], 2)
1905             self.failUnlessEqual(rec["configured-shares"], 2)
1906             size = sf0_size + sf2_size
1907             self.failUnlessEqual(rec["actual-sharebytes"], size)
1908             self.failUnlessEqual(rec["original-sharebytes"], size)
1909             self.failUnlessEqual(rec["configured-sharebytes"], size)
1910             # different platforms have different notions of "blocks used by
1911             # this file", so merely assert that it's a number
1912             self.failUnless(rec["actual-diskbytes"] >= 0,
1913                             rec["actual-diskbytes"])
1914             self.failUnless(rec["original-diskbytes"] >= 0,
1915                             rec["original-diskbytes"])
1916             self.failUnless(rec["configured-diskbytes"] >= 0,
1917                             rec["configured-diskbytes"])
1918         d.addCallback(_after_first_cycle)
1919         d.addCallback(lambda ign: self.render1(webstatus))
1920         def _check_html(html):
1921             s = remove_tags(html)
1922             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1923             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1924             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1925         d.addCallback(_check_html)
1926         return d
1927
1928     def test_expire_cutoff_date(self):
1929         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1930         fileutil.make_dirs(basedir)
1931         # setting cutoff-date to 2000 seconds ago means that any lease which
1932         # is more than 2000s old will be expired.
1933         now = time.time()
1934         then = int(now - 2000)
1935         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1936                                        expiration_enabled=True,
1937                                        expiration_mode="cutoff-date",
1938                                        expiration_cutoff_date=then)
1939         # make it start sooner than usual.
1940         lc = ss.lease_checker
1941         lc.slow_start = 0
1942         lc.stop_after_first_bucket = True
1943         webstatus = StorageStatus(ss)
1944
1945         # create a few shares, with some leases on them
1946         self.make_shares(ss)
1947         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1948
1949         def count_shares(si):
1950             return len(list(ss._iter_share_files(si)))
1951         def _get_sharefile(si):
1952             return list(ss._iter_share_files(si))[0]
1953         def count_leases(si):
1954             return len(list(_get_sharefile(si).get_leases()))
1955
1956         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1957         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1958         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1959         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1960         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1961         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1962         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1963         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1964
1965         # artificially crank back the expiration time on the first lease of
1966         # each share, to make it look like was renewed 3000s ago. To achieve
1967         # this, we need to set the expiration time to now-3000+31days. This
1968         # will change when the lease format is improved to contain both
1969         # create/renew time and duration.
1970         new_expiration_time = now - 3000 + 31*24*60*60
1971
1972         # Some shares have an extra lease which is set to expire at the
1973         # default time in 31 days from now (age=31days). We then run the
1974         # crawler, which will expire the first lease, making some shares get
1975         # deleted and others stay alive (with one remaining lease)
1976
1977         sf0 = _get_sharefile(immutable_si_0)
1978         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1979         sf0_size = os.stat(sf0.home).st_size
1980
1981         # immutable_si_1 gets an extra lease
1982         sf1 = _get_sharefile(immutable_si_1)
1983         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1984
1985         sf2 = _get_sharefile(mutable_si_2)
1986         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1987         sf2_size = os.stat(sf2.home).st_size
1988
1989         # mutable_si_3 gets an extra lease
1990         sf3 = _get_sharefile(mutable_si_3)
1991         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1992
1993         ss.setServiceParent(self.s)
1994
1995         d = fireEventually()
1996         # examine the state right after the first bucket has been processed
1997         def _after_first_bucket(ignored):
1998             p = lc.get_progress()
1999             if not p["cycle-in-progress"]:
2000                 d2 = fireEventually()
2001                 d2.addCallback(_after_first_bucket)
2002                 return d2
2003         d.addCallback(_after_first_bucket)
2004         d.addCallback(lambda ign: self.render1(webstatus))
2005         def _check_html_in_cycle(html):
2006             s = remove_tags(html)
2007             # the first bucket encountered gets deleted, and its prefix
2008             # happens to be about 1/5th of the way through the ring, so the
2009             # predictor thinks we'll have 5 shares and that we'll delete them
2010             # all. This part of the test depends upon the SIs landing right
2011             # where they do now.
2012             self.failUnlessIn("The remainder of this cycle is expected to "
2013                               "recover: 4 shares, 4 buckets", s)
2014             self.failUnlessIn("The whole cycle is expected to examine "
2015                               "5 shares in 5 buckets and to recover: "
2016                               "5 shares, 5 buckets", s)
2017         d.addCallback(_check_html_in_cycle)
2018
2019         # wait for the crawler to finish the first cycle. Two shares should
2020         # have been removed
2021         def _wait():
2022             return bool(lc.get_state()["last-cycle-finished"] is not None)
2023         d.addCallback(lambda ign: self.poll(_wait))
2024
2025         def _after_first_cycle(ignored):
2026             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2027             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2028             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2029             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2030             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2031             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2032
2033             s = lc.get_state()
2034             last = s["history"][0]
2035
2036             self.failUnlessEqual(last["expiration-enabled"], True)
2037             self.failUnlessEqual(last["configured-expiration-mode"],
2038                                  ("cutoff-date", None, then,
2039                                   ("mutable", "immutable")))
2040             self.failUnlessEqual(last["leases-per-share-histogram"],
2041                                  {1: 2, 2: 2})
2042
2043             rec = last["space-recovered"]
2044             self.failUnlessEqual(rec["examined-buckets"], 4)
2045             self.failUnlessEqual(rec["examined-shares"], 4)
2046             self.failUnlessEqual(rec["actual-buckets"], 2)
2047             self.failUnlessEqual(rec["original-buckets"], 0)
2048             self.failUnlessEqual(rec["configured-buckets"], 2)
2049             self.failUnlessEqual(rec["actual-shares"], 2)
2050             self.failUnlessEqual(rec["original-shares"], 0)
2051             self.failUnlessEqual(rec["configured-shares"], 2)
2052             size = sf0_size + sf2_size
2053             self.failUnlessEqual(rec["actual-sharebytes"], size)
2054             self.failUnlessEqual(rec["original-sharebytes"], 0)
2055             self.failUnlessEqual(rec["configured-sharebytes"], size)
2056             # different platforms have different notions of "blocks used by
2057             # this file", so merely assert that it's a number
2058             self.failUnless(rec["actual-diskbytes"] >= 0,
2059                             rec["actual-diskbytes"])
2060             self.failUnless(rec["original-diskbytes"] >= 0,
2061                             rec["original-diskbytes"])
2062             self.failUnless(rec["configured-diskbytes"] >= 0,
2063                             rec["configured-diskbytes"])
2064         d.addCallback(_after_first_cycle)
2065         d.addCallback(lambda ign: self.render1(webstatus))
2066         def _check_html(html):
2067             s = remove_tags(html)
2068             self.failUnlessIn("Expiration Enabled:"
2069                               " expired leases will be removed", s)
2070             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2071             substr = "Leases created or last renewed before %s will be considered expired." % date
2072             self.failUnlessIn(substr, s)
2073             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2074         d.addCallback(_check_html)
2075         return d
2076
2077     def test_only_immutable(self):
2078         basedir = "storage/LeaseCrawler/only_immutable"
2079         fileutil.make_dirs(basedir)
2080         now = time.time()
2081         then = int(now - 2000)
2082         ss = StorageServer(basedir, "\x00" * 20,
2083                            expiration_enabled=True,
2084                            expiration_mode="cutoff-date",
2085                            expiration_cutoff_date=then,
2086                            expiration_sharetypes=("immutable",))
2087         lc = ss.lease_checker
2088         lc.slow_start = 0
2089         webstatus = StorageStatus(ss)
2090
2091         self.make_shares(ss)
2092         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2093         # set all leases to be expirable
2094         new_expiration_time = now - 3000 + 31*24*60*60
2095
2096         def count_shares(si):
2097             return len(list(ss._iter_share_files(si)))
2098         def _get_sharefile(si):
2099             return list(ss._iter_share_files(si))[0]
2100         def count_leases(si):
2101             return len(list(_get_sharefile(si).get_leases()))
2102
2103         sf0 = _get_sharefile(immutable_si_0)
2104         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2105         sf1 = _get_sharefile(immutable_si_1)
2106         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2107         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2108         sf2 = _get_sharefile(mutable_si_2)
2109         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2110         sf3 = _get_sharefile(mutable_si_3)
2111         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2112         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2113
2114         ss.setServiceParent(self.s)
2115         def _wait():
2116             return bool(lc.get_state()["last-cycle-finished"] is not None)
2117         d = self.poll(_wait)
2118
2119         def _after_first_cycle(ignored):
2120             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2121             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2122             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2123             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2124             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2125             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2126         d.addCallback(_after_first_cycle)
2127         d.addCallback(lambda ign: self.render1(webstatus))
2128         def _check_html(html):
2129             s = remove_tags(html)
2130             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2131         d.addCallback(_check_html)
2132         return d
2133
2134     def test_only_mutable(self):
2135         basedir = "storage/LeaseCrawler/only_mutable"
2136         fileutil.make_dirs(basedir)
2137         now = time.time()
2138         then = int(now - 2000)
2139         ss = StorageServer(basedir, "\x00" * 20,
2140                            expiration_enabled=True,
2141                            expiration_mode="cutoff-date",
2142                            expiration_cutoff_date=then,
2143                            expiration_sharetypes=("mutable",))
2144         lc = ss.lease_checker
2145         lc.slow_start = 0
2146         webstatus = StorageStatus(ss)
2147
2148         self.make_shares(ss)
2149         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2150         # set all leases to be expirable
2151         new_expiration_time = now - 3000 + 31*24*60*60
2152
2153         def count_shares(si):
2154             return len(list(ss._iter_share_files(si)))
2155         def _get_sharefile(si):
2156             return list(ss._iter_share_files(si))[0]
2157         def count_leases(si):
2158             return len(list(_get_sharefile(si).get_leases()))
2159
2160         sf0 = _get_sharefile(immutable_si_0)
2161         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2162         sf1 = _get_sharefile(immutable_si_1)
2163         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2164         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2165         sf2 = _get_sharefile(mutable_si_2)
2166         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2167         sf3 = _get_sharefile(mutable_si_3)
2168         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2169         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2170
2171         ss.setServiceParent(self.s)
2172         def _wait():
2173             return bool(lc.get_state()["last-cycle-finished"] is not None)
2174         d = self.poll(_wait)
2175
2176         def _after_first_cycle(ignored):
2177             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2178             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2179             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2180             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2181             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2182             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2183         d.addCallback(_after_first_cycle)
2184         d.addCallback(lambda ign: self.render1(webstatus))
2185         def _check_html(html):
2186             s = remove_tags(html)
2187             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2188         d.addCallback(_check_html)
2189         return d
2190
2191     def test_bad_mode(self):
2192         basedir = "storage/LeaseCrawler/bad_mode"
2193         fileutil.make_dirs(basedir)
2194         e = self.failUnlessRaises(ValueError,
2195                                   StorageServer, basedir, "\x00" * 20,
2196                                   expiration_mode="bogus")
2197         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2198
2199     def test_parse_duration(self):
2200         DAY = 24*60*60
2201         MONTH = 31*DAY
2202         YEAR = 365*DAY
2203         p = time_format.parse_duration
2204         self.failUnlessEqual(p("7days"), 7*DAY)
2205         self.failUnlessEqual(p("31day"), 31*DAY)
2206         self.failUnlessEqual(p("60 days"), 60*DAY)
2207         self.failUnlessEqual(p("2mo"), 2*MONTH)
2208         self.failUnlessEqual(p("3 month"), 3*MONTH)
2209         self.failUnlessEqual(p("2years"), 2*YEAR)
2210         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2211         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2212
2213     def test_parse_date(self):
2214         p = time_format.parse_date
2215         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2216         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2217
2218     def test_limited_history(self):
2219         basedir = "storage/LeaseCrawler/limited_history"
2220         fileutil.make_dirs(basedir)
2221         ss = StorageServer(basedir, "\x00" * 20)
2222         # make it start sooner than usual.
2223         lc = ss.lease_checker
2224         lc.slow_start = 0
2225         lc.cpu_slice = 500
2226
2227         # create a few shares, with some leases on them
2228         self.make_shares(ss)
2229
2230         ss.setServiceParent(self.s)
2231
2232         def _wait_until_15_cycles_done():
2233             last = lc.state["last-cycle-finished"]
2234             if last is not None and last >= 15:
2235                 return True
2236             if lc.timer:
2237                 lc.timer.reset(0)
2238             return False
2239         d = self.poll(_wait_until_15_cycles_done)
2240
2241         def _check(ignored):
2242             s = lc.get_state()
2243             h = s["history"]
2244             self.failUnlessEqual(len(h), 10)
2245             self.failUnlessEqual(max(h.keys()), 15)
2246             self.failUnlessEqual(min(h.keys()), 6)
2247         d.addCallback(_check)
2248         return d
2249
2250     def test_unpredictable_future(self):
2251         basedir = "storage/LeaseCrawler/unpredictable_future"
2252         fileutil.make_dirs(basedir)
2253         ss = StorageServer(basedir, "\x00" * 20)
2254         # make it start sooner than usual.
2255         lc = ss.lease_checker
2256         lc.slow_start = 0
2257         lc.cpu_slice = -1.0 # stop quickly
2258
2259         self.make_shares(ss)
2260
2261         ss.setServiceParent(self.s)
2262
2263         d = fireEventually()
2264         def _check(ignored):
2265             # this should fire after the first bucket is complete, but before
2266             # the first prefix is complete, so the progress-measurer won't
2267             # think we've gotten far enough to raise our percent-complete
2268             # above 0%, triggering the cannot-predict-the-future code in
2269             # expirer.py . This will have to change if/when the
2270             # progress-measurer gets smart enough to count buckets (we'll
2271             # have to interrupt it even earlier, before it's finished the
2272             # first bucket).
2273             s = lc.get_state()
2274             if "cycle-to-date" not in s:
2275                 d2 = fireEventually()
2276                 d2.addCallback(_check)
2277                 return d2
2278             self.failUnlessIn("cycle-to-date", s)
2279             self.failUnlessIn("estimated-remaining-cycle", s)
2280             self.failUnlessIn("estimated-current-cycle", s)
2281
2282             left = s["estimated-remaining-cycle"]["space-recovered"]
2283             self.failUnlessEqual(left["actual-buckets"], None)
2284             self.failUnlessEqual(left["original-buckets"], None)
2285             self.failUnlessEqual(left["configured-buckets"], None)
2286             self.failUnlessEqual(left["actual-shares"], None)
2287             self.failUnlessEqual(left["original-shares"], None)
2288             self.failUnlessEqual(left["configured-shares"], None)
2289             self.failUnlessEqual(left["actual-diskbytes"], None)
2290             self.failUnlessEqual(left["original-diskbytes"], None)
2291             self.failUnlessEqual(left["configured-diskbytes"], None)
2292             self.failUnlessEqual(left["actual-sharebytes"], None)
2293             self.failUnlessEqual(left["original-sharebytes"], None)
2294             self.failUnlessEqual(left["configured-sharebytes"], None)
2295
2296             full = s["estimated-remaining-cycle"]["space-recovered"]
2297             self.failUnlessEqual(full["actual-buckets"], None)
2298             self.failUnlessEqual(full["original-buckets"], None)
2299             self.failUnlessEqual(full["configured-buckets"], None)
2300             self.failUnlessEqual(full["actual-shares"], None)
2301             self.failUnlessEqual(full["original-shares"], None)
2302             self.failUnlessEqual(full["configured-shares"], None)
2303             self.failUnlessEqual(full["actual-diskbytes"], None)
2304             self.failUnlessEqual(full["original-diskbytes"], None)
2305             self.failUnlessEqual(full["configured-diskbytes"], None)
2306             self.failUnlessEqual(full["actual-sharebytes"], None)
2307             self.failUnlessEqual(full["original-sharebytes"], None)
2308             self.failUnlessEqual(full["configured-sharebytes"], None)
2309
2310         d.addCallback(_check)
2311         return d
2312
2313     def test_no_st_blocks(self):
2314         basedir = "storage/LeaseCrawler/no_st_blocks"
2315         fileutil.make_dirs(basedir)
2316         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2317                                         expiration_mode="age",
2318                                         expiration_override_lease_duration=-1000)
2319         # a negative expiration_time= means the "configured-"
2320         # space-recovered counts will be non-zero, since all shares will have
2321         # expired by then
2322
2323         # make it start sooner than usual.
2324         lc = ss.lease_checker
2325         lc.slow_start = 0
2326
2327         self.make_shares(ss)
2328         ss.setServiceParent(self.s)
2329         def _wait():
2330             return bool(lc.get_state()["last-cycle-finished"] is not None)
2331         d = self.poll(_wait)
2332
2333         def _check(ignored):
2334             s = lc.get_state()
2335             last = s["history"][0]
2336             rec = last["space-recovered"]
2337             self.failUnlessEqual(rec["configured-buckets"], 4)
2338             self.failUnlessEqual(rec["configured-shares"], 4)
2339             self.failUnless(rec["configured-sharebytes"] > 0,
2340                             rec["configured-sharebytes"])
2341             # without the .st_blocks field in os.stat() results, we should be
2342             # reporting diskbytes==sharebytes
2343             self.failUnlessEqual(rec["configured-sharebytes"],
2344                                  rec["configured-diskbytes"])
2345         d.addCallback(_check)
2346         return d
2347
2348     def test_share_corruption(self):
2349         self._poll_should_ignore_these_errors = [
2350             UnknownMutableContainerVersionError,
2351             UnknownImmutableContainerVersionError,
2352             ]
2353         basedir = "storage/LeaseCrawler/share_corruption"
2354         fileutil.make_dirs(basedir)
2355         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2356         w = StorageStatus(ss)
2357         # make it start sooner than usual.
2358         lc = ss.lease_checker
2359         lc.stop_after_first_bucket = True
2360         lc.slow_start = 0
2361         lc.cpu_slice = 500
2362
2363         # create a few shares, with some leases on them
2364         self.make_shares(ss)
2365
2366         # now corrupt one, and make sure the lease-checker keeps going
2367         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2368         first = min(self.sis)
2369         first_b32 = base32.b2a(first)
2370         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2371         f = open(fn, "rb+")
2372         f.seek(0)
2373         f.write("BAD MAGIC")
2374         f.close()
2375         # if get_share_file() doesn't see the correct mutable magic, it
2376         # assumes the file is an immutable share, and then
2377         # immutable.ShareFile sees a bad version. So regardless of which kind
2378         # of share we corrupted, this will trigger an
2379         # UnknownImmutableContainerVersionError.
2380
2381         # also create an empty bucket
2382         empty_si = base32.b2a("\x04"*16)
2383         empty_bucket_dir = os.path.join(ss.sharedir,
2384                                         storage_index_to_dir(empty_si))
2385         fileutil.make_dirs(empty_bucket_dir)
2386
2387         ss.setServiceParent(self.s)
2388
2389         d = fireEventually()
2390
2391         # now examine the state right after the first bucket has been
2392         # processed.
2393         def _after_first_bucket(ignored):
2394             s = lc.get_state()
2395             if "cycle-to-date" not in s:
2396                 d2 = fireEventually()
2397                 d2.addCallback(_after_first_bucket)
2398                 return d2
2399             so_far = s["cycle-to-date"]
2400             rec = so_far["space-recovered"]
2401             self.failUnlessEqual(rec["examined-buckets"], 1)
2402             self.failUnlessEqual(rec["examined-shares"], 0)
2403             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2404         d.addCallback(_after_first_bucket)
2405
2406         d.addCallback(lambda ign: self.render_json(w))
2407         def _check_json(json):
2408             data = simplejson.loads(json)
2409             # grr. json turns all dict keys into strings.
2410             so_far = data["lease-checker"]["cycle-to-date"]
2411             corrupt_shares = so_far["corrupt-shares"]
2412             # it also turns all tuples into lists
2413             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2414         d.addCallback(_check_json)
2415         d.addCallback(lambda ign: self.render1(w))
2416         def _check_html(html):
2417             s = remove_tags(html)
2418             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2419         d.addCallback(_check_html)
2420
2421         def _wait():
2422             return bool(lc.get_state()["last-cycle-finished"] is not None)
2423         d.addCallback(lambda ign: self.poll(_wait))
2424
2425         def _after_first_cycle(ignored):
2426             s = lc.get_state()
2427             last = s["history"][0]
2428             rec = last["space-recovered"]
2429             self.failUnlessEqual(rec["examined-buckets"], 5)
2430             self.failUnlessEqual(rec["examined-shares"], 3)
2431             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2432         d.addCallback(_after_first_cycle)
2433         d.addCallback(lambda ign: self.render_json(w))
2434         def _check_json_history(json):
2435             data = simplejson.loads(json)
2436             last = data["lease-checker"]["history"]["0"]
2437             corrupt_shares = last["corrupt-shares"]
2438             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2439         d.addCallback(_check_json_history)
2440         d.addCallback(lambda ign: self.render1(w))
2441         def _check_html_history(html):
2442             s = remove_tags(html)
2443             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2444         d.addCallback(_check_html_history)
2445
2446         def _cleanup(res):
2447             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2448                                    UnknownImmutableContainerVersionError)
2449             return res
2450         d.addBoth(_cleanup)
2451         return d
2452
2453     def render_json(self, page):
2454         d = self.render1(page, args={"t": ["json"]})
2455         return d
2456
2457 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2458
2459     def setUp(self):
2460         self.s = service.MultiService()
2461         self.s.startService()
2462     def tearDown(self):
2463         return self.s.stopService()
2464
2465     def test_no_server(self):
2466         w = StorageStatus(None)
2467         html = w.renderSynchronously()
2468         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2469
2470     def test_status(self):
2471         basedir = "storage/WebStatus/status"
2472         fileutil.make_dirs(basedir)
2473         ss = StorageServer(basedir, "\x00" * 20)
2474         ss.setServiceParent(self.s)
2475         w = StorageStatus(ss)
2476         d = self.render1(w)
2477         def _check_html(html):
2478             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2479             s = remove_tags(html)
2480             self.failUnlessIn("Accepting new shares: Yes", s)
2481             self.failUnlessIn("Reserved space: - 0 B (0)", s)
2482         d.addCallback(_check_html)
2483         d.addCallback(lambda ign: self.render_json(w))
2484         def _check_json(json):
2485             data = simplejson.loads(json)
2486             s = data["stats"]
2487             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2488             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2489             self.failUnlessIn("bucket-counter", data)
2490             self.failUnlessIn("lease-checker", data)
2491         d.addCallback(_check_json)
2492         return d
2493
2494     def render_json(self, page):
2495         d = self.render1(page, args={"t": ["json"]})
2496         return d
2497
2498     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2499     def test_status_no_disk_stats(self, mock_get_disk_stats):
2500         mock_get_disk_stats.side_effect = AttributeError()
2501
2502         # Some platforms may have no disk stats API. Make sure the code can handle that
2503         # (test runs on all platforms).
2504         basedir = "storage/WebStatus/status_no_disk_stats"
2505         fileutil.make_dirs(basedir)
2506         ss = StorageServer(basedir, "\x00" * 20)
2507         ss.setServiceParent(self.s)
2508         w = StorageStatus(ss)
2509         html = w.renderSynchronously()
2510         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2511         s = remove_tags(html)
2512         self.failUnlessIn("Accepting new shares: Yes", s)
2513         self.failUnlessIn("Total disk space: ?", s)
2514         self.failUnlessIn("Space Available to Tahoe: ?", s)
2515         self.failUnless(ss.get_available_space() is None)
2516
2517     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2518     def test_status_bad_disk_stats(self, mock_get_disk_stats):
2519         mock_get_disk_stats.side_effect = OSError()
2520
2521         # If the API to get disk stats exists but a call to it fails, then the status should
2522         # show that no shares will be accepted, and get_available_space() should be 0.
2523         basedir = "storage/WebStatus/status_bad_disk_stats"
2524         fileutil.make_dirs(basedir)
2525         ss = StorageServer(basedir, "\x00" * 20)
2526         ss.setServiceParent(self.s)
2527         w = StorageStatus(ss)
2528         html = w.renderSynchronously()
2529         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2530         s = remove_tags(html)
2531         self.failUnlessIn("Accepting new shares: No", s)
2532         self.failUnlessIn("Total disk space: ?", s)
2533         self.failUnlessIn("Space Available to Tahoe: ?", s)
2534         self.failUnlessEqual(ss.get_available_space(), 0)
2535
2536     def test_readonly(self):
2537         basedir = "storage/WebStatus/readonly"
2538         fileutil.make_dirs(basedir)
2539         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2540         ss.setServiceParent(self.s)
2541         w = StorageStatus(ss)
2542         html = w.renderSynchronously()
2543         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2544         s = remove_tags(html)
2545         self.failUnlessIn("Accepting new shares: No", s)
2546
2547     def test_reserved(self):
2548         basedir = "storage/WebStatus/reserved"
2549         fileutil.make_dirs(basedir)
2550         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2551         ss.setServiceParent(self.s)
2552         w = StorageStatus(ss)
2553         html = w.renderSynchronously()
2554         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2555         s = remove_tags(html)
2556         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2557
2558     def test_huge_reserved(self):
2559         basedir = "storage/WebStatus/reserved"
2560         fileutil.make_dirs(basedir)
2561         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2562         ss.setServiceParent(self.s)
2563         w = StorageStatus(ss)
2564         html = w.renderSynchronously()
2565         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2566         s = remove_tags(html)
2567         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2568
2569     def test_util(self):
2570         w = StorageStatus(None)
2571         self.failUnlessEqual(w.render_space(None, None), "?")
2572         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2573         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2574         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2575         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2576         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)