]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
test_storage.py: change skip note for test_large_share to say that Windows doesn...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re, simplejson, struct
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.interfaces import BadWriteEnablerError
23 from allmydata.test.common import LoggingServiceParent
24 from allmydata.test.common_web import WebRenderingMixin
25 from allmydata.web.storage import StorageStatus, remove_prefix
26
27 class Marker:
28     pass
29 class FakeCanary:
30     def __init__(self, ignore_disconnectors=False):
31         self.ignore = ignore_disconnectors
32         self.disconnectors = {}
33     def notifyOnDisconnect(self, f, *args, **kwargs):
34         if self.ignore:
35             return
36         m = Marker()
37         self.disconnectors[m] = (f, args, kwargs)
38         return m
39     def dontNotifyOnDisconnect(self, marker):
40         if self.ignore:
41             return
42         del self.disconnectors[marker]
43
44 class FakeStatsProvider:
45     def count(self, name, delta=1):
46         pass
47     def register_producer(self, producer):
48         pass
49
50 class Bucket(unittest.TestCase):
51     def make_workdir(self, name):
52         basedir = os.path.join("storage", "Bucket", name)
53         incoming = os.path.join(basedir, "tmp", "bucket")
54         final = os.path.join(basedir, "bucket")
55         fileutil.make_dirs(basedir)
56         fileutil.make_dirs(os.path.join(basedir, "tmp"))
57         return incoming, final
58
59     def bucket_writer_closed(self, bw, consumed):
60         pass
61     def add_latency(self, category, latency):
62         pass
63     def count(self, name, delta=1):
64         pass
65
66     def make_lease(self):
67         owner_num = 0
68         renew_secret = os.urandom(32)
69         cancel_secret = os.urandom(32)
70         expiration_time = time.time() + 5000
71         return LeaseInfo(owner_num, renew_secret, cancel_secret,
72                          expiration_time, "\x00" * 20)
73
74     def test_create(self):
75         incoming, final = self.make_workdir("test_create")
76         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
77                           FakeCanary())
78         bw.remote_write(0, "a"*25)
79         bw.remote_write(25, "b"*25)
80         bw.remote_write(50, "c"*25)
81         bw.remote_write(75, "d"*7)
82         bw.remote_close()
83
84     def test_readwrite(self):
85         incoming, final = self.make_workdir("test_readwrite")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*7) # last block may be short
91         bw.remote_close()
92
93         # now read from it
94         br = BucketReader(self, bw.finalhome)
95         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
98
99 class RemoteBucket:
100
101     def callRemote(self, methname, *args, **kwargs):
102         def _call():
103             meth = getattr(self.target, "remote_" + methname)
104             return meth(*args, **kwargs)
105         return defer.maybeDeferred(_call)
106
107 class BucketProxy(unittest.TestCase):
108     def make_bucket(self, name, size):
109         basedir = os.path.join("storage", "BucketProxy", name)
110         incoming = os.path.join(basedir, "tmp", "bucket")
111         final = os.path.join(basedir, "bucket")
112         fileutil.make_dirs(basedir)
113         fileutil.make_dirs(os.path.join(basedir, "tmp"))
114         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
115                           FakeCanary())
116         rb = RemoteBucket()
117         rb.target = bw
118         return bw, rb, final
119
120     def make_lease(self):
121         owner_num = 0
122         renew_secret = os.urandom(32)
123         cancel_secret = os.urandom(32)
124         expiration_time = time.time() + 5000
125         return LeaseInfo(owner_num, renew_secret, cancel_secret,
126                          expiration_time, "\x00" * 20)
127
128     def bucket_writer_closed(self, bw, consumed):
129         pass
130     def add_latency(self, category, latency):
131         pass
132     def count(self, name, delta=1):
133         pass
134
135     def test_create(self):
136         bw, rb, sharefname = self.make_bucket("test_create", 500)
137         bp = WriteBucketProxy(rb,
138                               data_size=300,
139                               block_size=10,
140                               num_segments=5,
141                               num_share_hashes=3,
142                               uri_extension_size_max=500, nodeid=None)
143         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
144
145     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146         # Let's pretend each share has 100 bytes of data, and that there are
147         # 4 segments (25 bytes each), and 8 shares total. So the two
148         # per-segment merkle trees (crypttext_hash_tree,
149         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
150         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
152         # long. That should make the whole share:
153         #
154         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
156
157         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
158
159         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
160                             for i in range(7)]
161         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
162                         for i in range(7)]
163         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
164                         for i in (1,9,13)]
165         uri_extension = "s" + "E"*498 + "e"
166
167         bw, rb, sharefname = self.make_bucket(name, sharesize)
168         bp = wbp_class(rb,
169                        data_size=95,
170                        block_size=25,
171                        num_segments=4,
172                        num_share_hashes=3,
173                        uri_extension_size_max=len(uri_extension),
174                        nodeid=None)
175
176         d = bp.put_header()
177         d.addCallback(lambda res: bp.put_block(0, "a"*25))
178         d.addCallback(lambda res: bp.put_block(1, "b"*25))
179         d.addCallback(lambda res: bp.put_block(2, "c"*25))
180         d.addCallback(lambda res: bp.put_block(3, "d"*20))
181         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185         d.addCallback(lambda res: bp.close())
186
187         # now read everything back
188         def _start_reading(res):
189             br = BucketReader(self, sharefname)
190             rb = RemoteBucket()
191             rb.target = br
192             rbp = rbp_class(rb, peerid="abc", storage_index="")
193             self.failUnlessIn("to peer", repr(rbp))
194             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
195
196             d1 = rbp.get_block_data(0, 25, 25)
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
204
205             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206             d1.addCallback(lambda res:
207                            self.failUnlessEqual(res, crypttext_hashes))
208             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210             d1.addCallback(lambda res: rbp.get_share_hashes())
211             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212             d1.addCallback(lambda res: rbp.get_uri_extension())
213             d1.addCallback(lambda res:
214                            self.failUnlessEqual(res, uri_extension))
215
216             return d1
217
218         d.addCallback(_start_reading)
219
220         return d
221
222     def test_readwrite_v1(self):
223         return self._do_test_readwrite("test_readwrite_v1",
224                                        0x24, WriteBucketProxy, ReadBucketProxy)
225
226     def test_readwrite_v2(self):
227         return self._do_test_readwrite("test_readwrite_v2",
228                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
229
230 class FakeDiskStorageServer(StorageServer):
231     DISKAVAIL = 0
232     def get_disk_stats(self):
233         return { 'free_for_nonroot': self.DISKAVAIL, 'avail': max(self.DISKAVAIL - self.reserved_space, 0), }
234
235 class Server(unittest.TestCase):
236
237     def setUp(self):
238         self.sparent = LoggingServiceParent()
239         self.sparent.startService()
240         self._lease_secret = itertools.count()
241     def tearDown(self):
242         return self.sparent.stopService()
243
244     def workdir(self, name):
245         basedir = os.path.join("storage", "Server", name)
246         return basedir
247
248     def create(self, name, reserved_space=0, klass=StorageServer):
249         workdir = self.workdir(name)
250         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
251                    stats_provider=FakeStatsProvider())
252         ss.setServiceParent(self.sparent)
253         return ss
254
255     def test_create(self):
256         self.create("test_create")
257
258     def allocate(self, ss, storage_index, sharenums, size, canary=None):
259         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
260         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
261         if not canary:
262             canary = FakeCanary()
263         return ss.remote_allocate_buckets(storage_index,
264                                           renew_secret, cancel_secret,
265                                           sharenums, size, canary)
266
267     def test_large_share(self):
268         ss = self.create("test_large_share")
269
270         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
271         self.failUnlessEqual(already, set())
272         self.failUnlessEqual(set(writers.keys()), set([0]))
273
274         shnum, bucket = writers.items()[0]
275         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
276         bucket.remote_write(2**32, "ab")
277         bucket.remote_close()
278
279         readers = ss.remote_get_buckets("allocate")
280         reader = readers[shnum]
281         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
282     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files)."
283
284     def test_dont_overfill_dirs(self):
285         """
286         This test asserts that if you add a second share whose storage index
287         share lots of leading bits with an extant share (but isn't the exact
288         same storage index), this won't add an entry to the share directory.
289         """
290         ss = self.create("test_dont_overfill_dirs")
291         already, writers = self.allocate(ss, "storageindex", [0], 10)
292         for i, wb in writers.items():
293             wb.remote_write(0, "%10d" % i)
294             wb.remote_close()
295         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296                                 "shares")
297         children_of_storedir = set(os.listdir(storedir))
298
299         # Now store another one under another storageindex that has leading
300         # chars the same as the first storageindex.
301         already, writers = self.allocate(ss, "storageindey", [0], 10)
302         for i, wb in writers.items():
303             wb.remote_write(0, "%10d" % i)
304             wb.remote_close()
305         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
306                                 "shares")
307         new_children_of_storedir = set(os.listdir(storedir))
308         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
309
310     def test_remove_incoming(self):
311         ss = self.create("test_remove_incoming")
312         already, writers = self.allocate(ss, "vid", range(3), 10)
313         for i,wb in writers.items():
314             wb.remote_write(0, "%10d" % i)
315             wb.remote_close()
316         incoming_share_dir = wb.incominghome
317         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
318         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
319         incoming_dir = os.path.dirname(incoming_prefix_dir)
320         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
321         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
322         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
323
324     def test_abort(self):
325         # remote_abort, when called on a writer, should make sure that
326         # the allocated size of the bucket is not counted by the storage
327         # server when accounting for space.
328         ss = self.create("test_abort")
329         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
330         self.failIfEqual(ss.allocated_size(), 0)
331
332         # Now abort the writers.
333         for writer in writers.itervalues():
334             writer.remote_abort()
335         self.failUnlessEqual(ss.allocated_size(), 0)
336
337
338     def test_allocate(self):
339         ss = self.create("test_allocate")
340
341         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
342
343         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
344         self.failUnlessEqual(already, set())
345         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
346
347         # while the buckets are open, they should not count as readable
348         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
349
350         # close the buckets
351         for i,wb in writers.items():
352             wb.remote_write(0, "%25d" % i)
353             wb.remote_close()
354             # aborting a bucket that was already closed is a no-op
355             wb.remote_abort()
356
357         # now they should be readable
358         b = ss.remote_get_buckets("allocate")
359         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
360         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
361         b_str = str(b[0])
362         self.failUnlessIn("BucketReader", b_str)
363         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
364
365         # now if we ask about writing again, the server should offer those
366         # three buckets as already present. It should offer them even if we
367         # don't ask about those specific ones.
368         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
369         self.failUnlessEqual(already, set([0,1,2]))
370         self.failUnlessEqual(set(writers.keys()), set([3,4]))
371
372         # while those two buckets are open for writing, the server should
373         # refuse to offer them to uploaders
374
375         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
376         self.failUnlessEqual(already2, set([0,1,2]))
377         self.failUnlessEqual(set(writers2.keys()), set([5]))
378
379         # aborting the writes should remove the tempfiles
380         for i,wb in writers2.items():
381             wb.remote_abort()
382         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
383         self.failUnlessEqual(already2, set([0,1,2]))
384         self.failUnlessEqual(set(writers2.keys()), set([5]))
385
386         for i,wb in writers2.items():
387             wb.remote_abort()
388         for i,wb in writers.items():
389             wb.remote_abort()
390
391     def test_bad_container_version(self):
392         ss = self.create("test_bad_container_version")
393         a,w = self.allocate(ss, "si1", [0], 10)
394         w[0].remote_write(0, "\xff"*10)
395         w[0].remote_close()
396
397         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
398         f = open(fn, "rb+")
399         f.seek(0)
400         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
401         f.close()
402
403         ss.remote_get_buckets("allocate")
404
405         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
406                                   ss.remote_get_buckets, "si1")
407         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
408
409     def test_disconnect(self):
410         # simulate a disconnection
411         ss = self.create("test_disconnect")
412         canary = FakeCanary()
413         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
414         self.failUnlessEqual(already, set())
415         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
416         for (f,args,kwargs) in canary.disconnectors.values():
417             f(*args, **kwargs)
418         del already
419         del writers
420
421         # that ought to delete the incoming shares
422         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
423         self.failUnlessEqual(already, set())
424         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
425
426     def test_reserved_space(self):
427         ss = self.create("test_reserved_space", reserved_space=10000,
428                          klass=FakeDiskStorageServer)
429         # the FakeDiskStorageServer doesn't do real calls to get_disk_stats
430         ss.DISKAVAIL = 15000
431         # 15k available, 10k reserved, leaves 5k for shares
432
433         # a newly created and filled share incurs this much overhead, beyond
434         # the size we request.
435         OVERHEAD = 3*4
436         LEASE_SIZE = 4+32+32+4
437         canary = FakeCanary(True)
438         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
439         self.failUnlessEqual(len(writers), 3)
440         # now the StorageServer should have 3000 bytes provisionally
441         # allocated, allowing only 2000 more to be claimed
442         self.failUnlessEqual(len(ss._active_writers), 3)
443
444         # allocating 1001-byte shares only leaves room for one
445         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
446         self.failUnlessEqual(len(writers2), 1)
447         self.failUnlessEqual(len(ss._active_writers), 4)
448
449         # we abandon the first set, so their provisional allocation should be
450         # returned
451         del already
452         del writers
453         self.failUnlessEqual(len(ss._active_writers), 1)
454         # now we have a provisional allocation of 1001 bytes
455
456         # and we close the second set, so their provisional allocation should
457         # become real, long-term allocation, and grows to include the
458         # overhead.
459         for bw in writers2.values():
460             bw.remote_write(0, "a"*25)
461             bw.remote_close()
462         del already2
463         del writers2
464         del bw
465         self.failUnlessEqual(len(ss._active_writers), 0)
466
467         allocated = 1001 + OVERHEAD + LEASE_SIZE
468
469         # we have to manually increase DISKAVAIL, since we're not doing real
470         # disk measurements
471         ss.DISKAVAIL -= allocated
472
473         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
474         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
475         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
476         self.failUnlessEqual(len(writers3), 39)
477         self.failUnlessEqual(len(ss._active_writers), 39)
478
479         del already3
480         del writers3
481         self.failUnlessEqual(len(ss._active_writers), 0)
482         ss.disownServiceParent()
483         del ss
484
485     def test_disk_stats(self):
486         # This will spuriously fail if there is zero disk space left (but so will other tests).
487         ss = self.create("test_disk_stats", reserved_space=0)
488
489         disk = ss.get_disk_stats()
490         self.failUnless(disk['total'] > 0, disk['total'])
491         self.failUnless(disk['used'] > 0, disk['used'])
492         self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
493         self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
494         self.failUnless(disk['avail'] > 0, disk['avail'])
495
496     def test_disk_stats_avail_nonnegative(self):
497         ss = self.create("test_disk_stats_avail_nonnegative", reserved_space=2**64)
498
499         disk = ss.get_disk_stats()
500         self.failUnlessEqual(disk['avail'], 0)
501
502     def test_seek(self):
503         basedir = self.workdir("test_seek_behavior")
504         fileutil.make_dirs(basedir)
505         filename = os.path.join(basedir, "testfile")
506         f = open(filename, "wb")
507         f.write("start")
508         f.close()
509         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
510         # files. mode="a" preserves previous contents but does not allow
511         # seeking-to-create-holes. mode="r+" allows both.
512         f = open(filename, "rb+")
513         f.seek(100)
514         f.write("100")
515         f.close()
516         filelen = os.stat(filename)[stat.ST_SIZE]
517         self.failUnlessEqual(filelen, 100+3)
518         f2 = open(filename, "rb")
519         self.failUnlessEqual(f2.read(5), "start")
520
521
522     def test_leases(self):
523         ss = self.create("test_leases")
524         canary = FakeCanary()
525         sharenums = range(5)
526         size = 100
527
528         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
529                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
530         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
531                                                      sharenums, size, canary)
532         self.failUnlessEqual(len(already), 0)
533         self.failUnlessEqual(len(writers), 5)
534         for wb in writers.values():
535             wb.remote_close()
536
537         leases = list(ss.get_leases("si0"))
538         self.failUnlessEqual(len(leases), 1)
539         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
540
541         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
542                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
543         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
544                                                      sharenums, size, canary)
545         for wb in writers.values():
546             wb.remote_close()
547
548         # take out a second lease on si1
549         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
550                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
551         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
552                                                      sharenums, size, canary)
553         self.failUnlessEqual(len(already), 5)
554         self.failUnlessEqual(len(writers), 0)
555
556         leases = list(ss.get_leases("si1"))
557         self.failUnlessEqual(len(leases), 2)
558         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
559
560         # and a third lease, using add-lease
561         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
562                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
563         ss.remote_add_lease("si1", rs2a, cs2a)
564         leases = list(ss.get_leases("si1"))
565         self.failUnlessEqual(len(leases), 3)
566         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
567
568         # add-lease on a missing storage index is silently ignored
569         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
570
571         # check that si0 is readable
572         readers = ss.remote_get_buckets("si0")
573         self.failUnlessEqual(len(readers), 5)
574
575         # renew the first lease. Only the proper renew_secret should work
576         ss.remote_renew_lease("si0", rs0)
577         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
578         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
579
580         # check that si0 is still readable
581         readers = ss.remote_get_buckets("si0")
582         self.failUnlessEqual(len(readers), 5)
583
584         # now cancel it
585         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
586         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
587         ss.remote_cancel_lease("si0", cs0)
588
589         # si0 should now be gone
590         readers = ss.remote_get_buckets("si0")
591         self.failUnlessEqual(len(readers), 0)
592         # and the renew should no longer work
593         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
594
595
596         # cancel the first lease on si1, leaving the second and third in place
597         ss.remote_cancel_lease("si1", cs1)
598         readers = ss.remote_get_buckets("si1")
599         self.failUnlessEqual(len(readers), 5)
600         # the corresponding renew should no longer work
601         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
602
603         leases = list(ss.get_leases("si1"))
604         self.failUnlessEqual(len(leases), 2)
605         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
606
607         ss.remote_renew_lease("si1", rs2)
608         # cancelling the second and third should make it go away
609         ss.remote_cancel_lease("si1", cs2)
610         ss.remote_cancel_lease("si1", cs2a)
611         readers = ss.remote_get_buckets("si1")
612         self.failUnlessEqual(len(readers), 0)
613         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
614         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
615         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
616
617         leases = list(ss.get_leases("si1"))
618         self.failUnlessEqual(len(leases), 0)
619
620
621         # test overlapping uploads
622         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
623                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
624         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
625                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
626         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
627                                                      sharenums, size, canary)
628         self.failUnlessEqual(len(already), 0)
629         self.failUnlessEqual(len(writers), 5)
630         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
631                                                        sharenums, size, canary)
632         self.failUnlessEqual(len(already2), 0)
633         self.failUnlessEqual(len(writers2), 0)
634         for wb in writers.values():
635             wb.remote_close()
636
637         leases = list(ss.get_leases("si3"))
638         self.failUnlessEqual(len(leases), 1)
639
640         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
641                                                        sharenums, size, canary)
642         self.failUnlessEqual(len(already3), 5)
643         self.failUnlessEqual(len(writers3), 0)
644
645         leases = list(ss.get_leases("si3"))
646         self.failUnlessEqual(len(leases), 2)
647
648     def test_readonly(self):
649         workdir = self.workdir("test_readonly")
650         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
651         ss.setServiceParent(self.sparent)
652
653         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
654         self.failUnlessEqual(already, set())
655         self.failUnlessEqual(writers, {})
656
657         stats = ss.get_stats()
658         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
659         if "storage_server.disk_avail" in stats:
660             # Some platforms may not have an API to get disk stats.
661             # But if there are stats, readonly_storage means disk_avail=0
662             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
663
664     def test_discard(self):
665         # discard is really only used for other tests, but we test it anyways
666         workdir = self.workdir("test_discard")
667         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
668         ss.setServiceParent(self.sparent)
669
670         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
671         self.failUnlessEqual(already, set())
672         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
673         for i,wb in writers.items():
674             wb.remote_write(0, "%25d" % i)
675             wb.remote_close()
676         # since we discard the data, the shares should be present but sparse.
677         # Since we write with some seeks, the data we read back will be all
678         # zeros.
679         b = ss.remote_get_buckets("vid")
680         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
681         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
682
683     def test_advise_corruption(self):
684         workdir = self.workdir("test_advise_corruption")
685         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
686         ss.setServiceParent(self.sparent)
687
688         si0_s = base32.b2a("si0")
689         ss.remote_advise_corrupt_share("immutable", "si0", 0,
690                                        "This share smells funny.\n")
691         reportdir = os.path.join(workdir, "corruption-advisories")
692         reports = os.listdir(reportdir)
693         self.failUnlessEqual(len(reports), 1)
694         report_si0 = reports[0]
695         self.failUnlessIn(si0_s, report_si0)
696         f = open(os.path.join(reportdir, report_si0), "r")
697         report = f.read()
698         f.close()
699         self.failUnlessIn("type: immutable", report)
700         self.failUnlessIn("storage_index: %s" % si0_s, report)
701         self.failUnlessIn("share_number: 0", report)
702         self.failUnlessIn("This share smells funny.", report)
703
704         # test the RIBucketWriter version too
705         si1_s = base32.b2a("si1")
706         already,writers = self.allocate(ss, "si1", [1], 75)
707         self.failUnlessEqual(already, set())
708         self.failUnlessEqual(set(writers.keys()), set([1]))
709         writers[1].remote_write(0, "data")
710         writers[1].remote_close()
711
712         b = ss.remote_get_buckets("si1")
713         self.failUnlessEqual(set(b.keys()), set([1]))
714         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
715
716         reports = os.listdir(reportdir)
717         self.failUnlessEqual(len(reports), 2)
718         report_si1 = [r for r in reports if si1_s in r][0]
719         f = open(os.path.join(reportdir, report_si1), "r")
720         report = f.read()
721         f.close()
722         self.failUnlessIn("type: immutable", report)
723         self.failUnlessIn("storage_index: %s" % si1_s, report)
724         self.failUnlessIn("share_number: 1", report)
725         self.failUnlessIn("This share tastes like dust.", report)
726
727
728
729 class MutableServer(unittest.TestCase):
730
731     def setUp(self):
732         self.sparent = LoggingServiceParent()
733         self._lease_secret = itertools.count()
734     def tearDown(self):
735         return self.sparent.stopService()
736
737     def workdir(self, name):
738         basedir = os.path.join("storage", "MutableServer", name)
739         return basedir
740
741     def create(self, name):
742         workdir = self.workdir(name)
743         ss = StorageServer(workdir, "\x00" * 20)
744         ss.setServiceParent(self.sparent)
745         return ss
746
747     def test_create(self):
748         self.create("test_create")
749
750     def write_enabler(self, we_tag):
751         return hashutil.tagged_hash("we_blah", we_tag)
752
753     def renew_secret(self, tag):
754         return hashutil.tagged_hash("renew_blah", str(tag))
755
756     def cancel_secret(self, tag):
757         return hashutil.tagged_hash("cancel_blah", str(tag))
758
759     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
760         write_enabler = self.write_enabler(we_tag)
761         renew_secret = self.renew_secret(lease_tag)
762         cancel_secret = self.cancel_secret(lease_tag)
763         rstaraw = ss.remote_slot_testv_and_readv_and_writev
764         testandwritev = dict( [ (shnum, ([], [], None) )
765                          for shnum in sharenums ] )
766         readv = []
767         rc = rstaraw(storage_index,
768                      (write_enabler, renew_secret, cancel_secret),
769                      testandwritev,
770                      readv)
771         (did_write, readv_data) = rc
772         self.failUnless(did_write)
773         self.failUnless(isinstance(readv_data, dict))
774         self.failUnlessEqual(len(readv_data), 0)
775
776     def test_bad_magic(self):
777         ss = self.create("test_bad_magic")
778         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
779         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
780         f = open(fn, "rb+")
781         f.seek(0)
782         f.write("BAD MAGIC")
783         f.close()
784         read = ss.remote_slot_readv
785         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
786                                   read, "si1", [0], [(0,10)])
787         self.failUnlessIn(" had magic ", str(e))
788         self.failUnlessIn(" but we wanted ", str(e))
789
790     def test_container_size(self):
791         ss = self.create("test_container_size")
792         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
793                       set([0,1,2]), 100)
794         read = ss.remote_slot_readv
795         rstaraw = ss.remote_slot_testv_and_readv_and_writev
796         secrets = ( self.write_enabler("we1"),
797                     self.renew_secret("we1"),
798                     self.cancel_secret("we1") )
799         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
800         answer = rstaraw("si1", secrets,
801                          {0: ([], [(0,data)], len(data)+12)},
802                          [])
803         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
804
805         # trying to make the container too large will raise an exception
806         TOOBIG = MutableShareFile.MAX_SIZE + 10
807         self.failUnlessRaises(DataTooLargeError,
808                               rstaraw, "si1", secrets,
809                               {0: ([], [(0,data)], TOOBIG)},
810                               [])
811
812         # it should be possible to make the container smaller, although at
813         # the moment this doesn't actually affect the share, unless the
814         # container size is dropped to zero, in which case the share is
815         # deleted.
816         answer = rstaraw("si1", secrets,
817                          {0: ([], [(0,data)], len(data)+8)},
818                          [])
819         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
820
821         answer = rstaraw("si1", secrets,
822                          {0: ([], [(0,data)], 0)},
823                          [])
824         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
825
826         read_answer = read("si1", [0], [(0,10)])
827         self.failUnlessEqual(read_answer, {})
828
829     def test_allocate(self):
830         ss = self.create("test_allocate")
831         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
832                       set([0,1,2]), 100)
833
834         read = ss.remote_slot_readv
835         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
836                              {0: [""]})
837         self.failUnlessEqual(read("si1", [], [(0, 10)]),
838                              {0: [""], 1: [""], 2: [""]})
839         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
840                              {0: [""]})
841
842         # try writing to one
843         secrets = ( self.write_enabler("we1"),
844                     self.renew_secret("we1"),
845                     self.cancel_secret("we1") )
846         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
847         write = ss.remote_slot_testv_and_readv_and_writev
848         answer = write("si1", secrets,
849                        {0: ([], [(0,data)], None)},
850                        [])
851         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
852
853         self.failUnlessEqual(read("si1", [0], [(0,20)]),
854                              {0: ["00000000001111111111"]})
855         self.failUnlessEqual(read("si1", [0], [(95,10)]),
856                              {0: ["99999"]})
857         #self.failUnlessEqual(s0.remote_get_length(), 100)
858
859         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
860         f = self.failUnlessRaises(BadWriteEnablerError,
861                                   write, "si1", bad_secrets,
862                                   {}, [])
863         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
864
865         # this testv should fail
866         answer = write("si1", secrets,
867                        {0: ([(0, 12, "eq", "444444444444"),
868                              (20, 5, "eq", "22222"),
869                              ],
870                             [(0, "x"*100)],
871                             None),
872                         },
873                        [(0,12), (20,5)],
874                        )
875         self.failUnlessEqual(answer, (False,
876                                       {0: ["000000000011", "22222"],
877                                        1: ["", ""],
878                                        2: ["", ""],
879                                        }))
880         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
881
882         # as should this one
883         answer = write("si1", secrets,
884                        {0: ([(10, 5, "lt", "11111"),
885                              ],
886                             [(0, "x"*100)],
887                             None),
888                         },
889                        [(10,5)],
890                        )
891         self.failUnlessEqual(answer, (False,
892                                       {0: ["11111"],
893                                        1: [""],
894                                        2: [""]},
895                                       ))
896         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
897
898
899     def test_operators(self):
900         # test operators, the data we're comparing is '11111' in all cases.
901         # test both fail+pass, reset data after each one.
902         ss = self.create("test_operators")
903
904         secrets = ( self.write_enabler("we1"),
905                     self.renew_secret("we1"),
906                     self.cancel_secret("we1") )
907         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
908         write = ss.remote_slot_testv_and_readv_and_writev
909         read = ss.remote_slot_readv
910
911         def reset():
912             write("si1", secrets,
913                   {0: ([], [(0,data)], None)},
914                   [])
915
916         reset()
917
918         #  lt
919         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
920                                              ],
921                                             [(0, "x"*100)],
922                                             None,
923                                             )}, [(10,5)])
924         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
925         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
926         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
927         reset()
928
929         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
930                                              ],
931                                             [(0, "x"*100)],
932                                             None,
933                                             )}, [(10,5)])
934         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
935         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
936         reset()
937
938         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
939                                              ],
940                                             [(0, "y"*100)],
941                                             None,
942                                             )}, [(10,5)])
943         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
944         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
945         reset()
946
947         #  le
948         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
949                                              ],
950                                             [(0, "x"*100)],
951                                             None,
952                                             )}, [(10,5)])
953         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
954         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
955         reset()
956
957         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
958                                              ],
959                                             [(0, "y"*100)],
960                                             None,
961                                             )}, [(10,5)])
962         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
963         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
964         reset()
965
966         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
967                                              ],
968                                             [(0, "y"*100)],
969                                             None,
970                                             )}, [(10,5)])
971         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
972         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
973         reset()
974
975         #  eq
976         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
977                                              ],
978                                             [(0, "x"*100)],
979                                             None,
980                                             )}, [(10,5)])
981         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
982         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
983         reset()
984
985         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
986                                              ],
987                                             [(0, "y"*100)],
988                                             None,
989                                             )}, [(10,5)])
990         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
991         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
992         reset()
993
994         #  ne
995         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
996                                              ],
997                                             [(0, "x"*100)],
998                                             None,
999                                             )}, [(10,5)])
1000         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1001         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1002         reset()
1003
1004         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1005                                              ],
1006                                             [(0, "y"*100)],
1007                                             None,
1008                                             )}, [(10,5)])
1009         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1010         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1011         reset()
1012
1013         #  ge
1014         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1015                                              ],
1016                                             [(0, "y"*100)],
1017                                             None,
1018                                             )}, [(10,5)])
1019         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1020         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1021         reset()
1022
1023         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1024                                              ],
1025                                             [(0, "y"*100)],
1026                                             None,
1027                                             )}, [(10,5)])
1028         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1029         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1030         reset()
1031
1032         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1033                                              ],
1034                                             [(0, "y"*100)],
1035                                             None,
1036                                             )}, [(10,5)])
1037         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1038         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1039         reset()
1040
1041         #  gt
1042         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1043                                              ],
1044                                             [(0, "y"*100)],
1045                                             None,
1046                                             )}, [(10,5)])
1047         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1048         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1049         reset()
1050
1051         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1052                                              ],
1053                                             [(0, "x"*100)],
1054                                             None,
1055                                             )}, [(10,5)])
1056         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1057         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1058         reset()
1059
1060         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1061                                              ],
1062                                             [(0, "x"*100)],
1063                                             None,
1064                                             )}, [(10,5)])
1065         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1066         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1067         reset()
1068
1069         # finally, test some operators against empty shares
1070         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1071                                              ],
1072                                             [(0, "x"*100)],
1073                                             None,
1074                                             )}, [(10,5)])
1075         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1076         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1077         reset()
1078
1079     def test_readv(self):
1080         ss = self.create("test_readv")
1081         secrets = ( self.write_enabler("we1"),
1082                     self.renew_secret("we1"),
1083                     self.cancel_secret("we1") )
1084         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1085         write = ss.remote_slot_testv_and_readv_and_writev
1086         read = ss.remote_slot_readv
1087         data = [("%d" % i) * 100 for i in range(3)]
1088         rc = write("si1", secrets,
1089                    {0: ([], [(0,data[0])], None),
1090                     1: ([], [(0,data[1])], None),
1091                     2: ([], [(0,data[2])], None),
1092                     }, [])
1093         self.failUnlessEqual(rc, (True, {}))
1094
1095         answer = read("si1", [], [(0, 10)])
1096         self.failUnlessEqual(answer, {0: ["0"*10],
1097                                       1: ["1"*10],
1098                                       2: ["2"*10]})
1099
1100     def compare_leases_without_timestamps(self, leases_a, leases_b):
1101         self.failUnlessEqual(len(leases_a), len(leases_b))
1102         for i in range(len(leases_a)):
1103             a = leases_a[i]
1104             b = leases_b[i]
1105             self.failUnlessEqual(a.owner_num,       b.owner_num)
1106             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1107             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1108             self.failUnlessEqual(a.nodeid,          b.nodeid)
1109
1110     def compare_leases(self, leases_a, leases_b):
1111         self.failUnlessEqual(len(leases_a), len(leases_b))
1112         for i in range(len(leases_a)):
1113             a = leases_a[i]
1114             b = leases_b[i]
1115             self.failUnlessEqual(a.owner_num,       b.owner_num)
1116             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1117             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1118             self.failUnlessEqual(a.nodeid,          b.nodeid)
1119             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1120
1121     def test_leases(self):
1122         ss = self.create("test_leases")
1123         def secrets(n):
1124             return ( self.write_enabler("we1"),
1125                      self.renew_secret("we1-%d" % n),
1126                      self.cancel_secret("we1-%d" % n) )
1127         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1128         write = ss.remote_slot_testv_and_readv_and_writev
1129         read = ss.remote_slot_readv
1130         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1131         self.failUnlessEqual(rc, (True, {}))
1132
1133         # create a random non-numeric file in the bucket directory, to
1134         # exercise the code that's supposed to ignore those.
1135         bucket_dir = os.path.join(self.workdir("test_leases"),
1136                                   "shares", storage_index_to_dir("si1"))
1137         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1138         f.write("you ought to be ignoring me\n")
1139         f.close()
1140
1141         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1142         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1143
1144         # add-lease on a missing storage index is silently ignored
1145         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1146
1147         # re-allocate the slots and use the same secrets, that should update
1148         # the lease
1149         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1150         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1151
1152         # renew it directly
1153         ss.remote_renew_lease("si1", secrets(0)[1])
1154         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1155
1156         # now allocate them with a bunch of different secrets, to trigger the
1157         # extended lease code. Use add_lease for one of them.
1158         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1159         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1160         secrets2 = secrets(2)
1161         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1162         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1163         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1164         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1165         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1166
1167         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1168
1169         # cancel one of them
1170         ss.remote_cancel_lease("si1", secrets(5)[2])
1171         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1172
1173         all_leases = list(s0.get_leases())
1174         # and write enough data to expand the container, forcing the server
1175         # to move the leases
1176         write("si1", secrets(0),
1177               {0: ([], [(0,data)], 200), },
1178               [])
1179
1180         # read back the leases, make sure they're still intact.
1181         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1182
1183         ss.remote_renew_lease("si1", secrets(0)[1])
1184         ss.remote_renew_lease("si1", secrets(1)[1])
1185         ss.remote_renew_lease("si1", secrets(2)[1])
1186         ss.remote_renew_lease("si1", secrets(3)[1])
1187         ss.remote_renew_lease("si1", secrets(4)[1])
1188         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1189         # get a new copy of the leases, with the current timestamps. Reading
1190         # data and failing to renew/cancel leases should leave the timestamps
1191         # alone.
1192         all_leases = list(s0.get_leases())
1193         # renewing with a bogus token should prompt an error message
1194
1195         # examine the exception thus raised, make sure the old nodeid is
1196         # present, to provide for share migration
1197         e = self.failUnlessRaises(IndexError,
1198                                   ss.remote_renew_lease, "si1",
1199                                   secrets(20)[1])
1200         e_s = str(e)
1201         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1202         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1203         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1204
1205         # same for cancelling
1206         self.failUnlessRaises(IndexError,
1207                               ss.remote_cancel_lease, "si1",
1208                               secrets(20)[2])
1209         self.compare_leases(all_leases, list(s0.get_leases()))
1210
1211         # reading shares should not modify the timestamp
1212         read("si1", [], [(0,200)])
1213         self.compare_leases(all_leases, list(s0.get_leases()))
1214
1215         write("si1", secrets(0),
1216               {0: ([], [(200, "make me bigger")], None)}, [])
1217         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1218
1219         write("si1", secrets(0),
1220               {0: ([], [(500, "make me really bigger")], None)}, [])
1221         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1222
1223         # now cancel them all
1224         ss.remote_cancel_lease("si1", secrets(0)[2])
1225         ss.remote_cancel_lease("si1", secrets(1)[2])
1226         ss.remote_cancel_lease("si1", secrets(2)[2])
1227         ss.remote_cancel_lease("si1", secrets(3)[2])
1228
1229         # the slot should still be there
1230         remaining_shares = read("si1", [], [(0,10)])
1231         self.failUnlessEqual(len(remaining_shares), 1)
1232         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1233
1234         # cancelling a non-existent lease should raise an IndexError
1235         self.failUnlessRaises(IndexError,
1236                               ss.remote_cancel_lease, "si1", "nonsecret")
1237
1238         # and the slot should still be there
1239         remaining_shares = read("si1", [], [(0,10)])
1240         self.failUnlessEqual(len(remaining_shares), 1)
1241         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1242
1243         ss.remote_cancel_lease("si1", secrets(4)[2])
1244         # now the slot should be gone
1245         no_shares = read("si1", [], [(0,10)])
1246         self.failUnlessEqual(no_shares, {})
1247
1248         # cancelling a lease on a non-existent share should raise an IndexError
1249         self.failUnlessRaises(IndexError,
1250                               ss.remote_cancel_lease, "si2", "nonsecret")
1251
1252     def test_remove(self):
1253         ss = self.create("test_remove")
1254         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1255                       set([0,1,2]), 100)
1256         readv = ss.remote_slot_readv
1257         writev = ss.remote_slot_testv_and_readv_and_writev
1258         secrets = ( self.write_enabler("we1"),
1259                     self.renew_secret("we1"),
1260                     self.cancel_secret("we1") )
1261         # delete sh0 by setting its size to zero
1262         answer = writev("si1", secrets,
1263                         {0: ([], [], 0)},
1264                         [])
1265         # the answer should mention all the shares that existed before the
1266         # write
1267         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1268         # but a new read should show only sh1 and sh2
1269         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1270                              {1: [""], 2: [""]})
1271
1272         # delete sh1 by setting its size to zero
1273         answer = writev("si1", secrets,
1274                         {1: ([], [], 0)},
1275                         [])
1276         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1277         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1278                              {2: [""]})
1279
1280         # delete sh2 by setting its size to zero
1281         answer = writev("si1", secrets,
1282                         {2: ([], [], 0)},
1283                         [])
1284         self.failUnlessEqual(answer, (True, {2:[]}) )
1285         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1286                              {})
1287         # and the bucket directory should now be gone
1288         si = base32.b2a("si1")
1289         # note: this is a detail of the storage server implementation, and
1290         # may change in the future
1291         prefix = si[:2]
1292         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1293         bucketdir = os.path.join(prefixdir, si)
1294         self.failUnless(os.path.exists(prefixdir), prefixdir)
1295         self.failIf(os.path.exists(bucketdir), bucketdir)
1296
1297 class Stats(unittest.TestCase):
1298
1299     def setUp(self):
1300         self.sparent = LoggingServiceParent()
1301         self._lease_secret = itertools.count()
1302     def tearDown(self):
1303         return self.sparent.stopService()
1304
1305     def workdir(self, name):
1306         basedir = os.path.join("storage", "Server", name)
1307         return basedir
1308
1309     def create(self, name):
1310         workdir = self.workdir(name)
1311         ss = StorageServer(workdir, "\x00" * 20)
1312         ss.setServiceParent(self.sparent)
1313         return ss
1314
1315     def test_latencies(self):
1316         ss = self.create("test_latencies")
1317         for i in range(10000):
1318             ss.add_latency("allocate", 1.0 * i)
1319         for i in range(1000):
1320             ss.add_latency("renew", 1.0 * i)
1321         for i in range(10):
1322             ss.add_latency("cancel", 2.0 * i)
1323         ss.add_latency("get", 5.0)
1324
1325         output = ss.get_latencies()
1326
1327         self.failUnlessEqual(sorted(output.keys()),
1328                              sorted(["allocate", "renew", "cancel", "get"]))
1329         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1330         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1331         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1332         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1333         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1334         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1335         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1336         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1337         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1338
1339         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1340         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1341         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
1342         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1343         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1344         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1345         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1346         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1347         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1348
1349         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1350         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1351         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1, output)
1352         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
1353         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1354         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1355         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1, output)
1356         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1, output)
1357         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1, output)
1358
1359         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1360         self.failUnless(abs(output["get"]["mean"] - 5) < 1, output)
1361         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1, output)
1362         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1, output)
1363         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1, output)
1364         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1, output)
1365         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1, output)
1366         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1, output)
1367         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1, output)
1368
1369 def remove_tags(s):
1370     s = re.sub(r'<[^>]*>', ' ', s)
1371     s = re.sub(r'\s+', ' ', s)
1372     return s
1373
1374 class MyBucketCountingCrawler(BucketCountingCrawler):
1375     def finished_prefix(self, cycle, prefix):
1376         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1377         if self.hook_ds:
1378             d = self.hook_ds.pop(0)
1379             d.callback(None)
1380
1381 class MyStorageServer(StorageServer):
1382     def add_bucket_counter(self):
1383         statefile = os.path.join(self.storedir, "bucket_counter.state")
1384         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1385         self.bucket_counter.setServiceParent(self)
1386
1387 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1388
1389     def setUp(self):
1390         self.s = service.MultiService()
1391         self.s.startService()
1392     def tearDown(self):
1393         return self.s.stopService()
1394
1395     def test_bucket_counter(self):
1396         basedir = "storage/BucketCounter/bucket_counter"
1397         fileutil.make_dirs(basedir)
1398         ss = StorageServer(basedir, "\x00" * 20)
1399         # to make sure we capture the bucket-counting-crawler in the middle
1400         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1401         # also make it start sooner than usual.
1402         ss.bucket_counter.slow_start = 0
1403         orig_cpu_slice = ss.bucket_counter.cpu_slice
1404         ss.bucket_counter.cpu_slice = 0
1405         ss.setServiceParent(self.s)
1406
1407         w = StorageStatus(ss)
1408
1409         # this sample is before the crawler has started doing anything
1410         html = w.renderSynchronously()
1411         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1412         s = remove_tags(html)
1413         self.failUnlessIn("Accepting new shares: Yes", s)
1414         self.failUnlessIn("Reserved space: - 0 B (0)", s)
1415         self.failUnlessIn("Total buckets: Not computed yet", s)
1416         self.failUnlessIn("Next crawl in", s)
1417
1418         # give the bucket-counting-crawler one tick to get started. The
1419         # cpu_slice=0 will force it to yield right after it processes the
1420         # first prefix
1421
1422         d = fireEventually()
1423         def _check(ignored):
1424             # are we really right after the first prefix?
1425             state = ss.bucket_counter.get_state()
1426             if state["last-complete-prefix"] is None:
1427                 d2 = fireEventually()
1428                 d2.addCallback(_check)
1429                 return d2
1430             self.failUnlessEqual(state["last-complete-prefix"],
1431                                  ss.bucket_counter.prefixes[0])
1432             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1433             html = w.renderSynchronously()
1434             s = remove_tags(html)
1435             self.failUnlessIn(" Current crawl ", s)
1436             self.failUnlessIn(" (next work in ", s)
1437         d.addCallback(_check)
1438
1439         # now give it enough time to complete a full cycle
1440         def _watch():
1441             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1442         d.addCallback(lambda ignored: self.poll(_watch))
1443         def _check2(ignored):
1444             ss.bucket_counter.cpu_slice = orig_cpu_slice
1445             html = w.renderSynchronously()
1446             s = remove_tags(html)
1447             self.failUnlessIn("Total buckets: 0 (the number of", s)
1448             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1449         d.addCallback(_check2)
1450         return d
1451
1452     def test_bucket_counter_cleanup(self):
1453         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1454         fileutil.make_dirs(basedir)
1455         ss = StorageServer(basedir, "\x00" * 20)
1456         # to make sure we capture the bucket-counting-crawler in the middle
1457         # of a cycle, we reach in and reduce its maximum slice time to 0.
1458         ss.bucket_counter.slow_start = 0
1459         orig_cpu_slice = ss.bucket_counter.cpu_slice
1460         ss.bucket_counter.cpu_slice = 0
1461         ss.setServiceParent(self.s)
1462
1463         d = fireEventually()
1464
1465         def _after_first_prefix(ignored):
1466             state = ss.bucket_counter.state
1467             if state["last-complete-prefix"] is None:
1468                 d2 = fireEventually()
1469                 d2.addCallback(_after_first_prefix)
1470                 return d2
1471             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1472             # now sneak in and mess with its state, to make sure it cleans up
1473             # properly at the end of the cycle
1474             self.failUnlessEqual(state["last-complete-prefix"],
1475                                  ss.bucket_counter.prefixes[0])
1476             state["bucket-counts"][-12] = {}
1477             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1478             ss.bucket_counter.save_state()
1479         d.addCallback(_after_first_prefix)
1480
1481         # now give it enough time to complete a cycle
1482         def _watch():
1483             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1484         d.addCallback(lambda ignored: self.poll(_watch))
1485         def _check2(ignored):
1486             ss.bucket_counter.cpu_slice = orig_cpu_slice
1487             s = ss.bucket_counter.get_state()
1488             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1489             self.failIf("bogusprefix!" in s["storage-index-samples"],
1490                         s["storage-index-samples"].keys())
1491         d.addCallback(_check2)
1492         return d
1493
1494     def test_bucket_counter_eta(self):
1495         basedir = "storage/BucketCounter/bucket_counter_eta"
1496         fileutil.make_dirs(basedir)
1497         ss = MyStorageServer(basedir, "\x00" * 20)
1498         ss.bucket_counter.slow_start = 0
1499         # these will be fired inside finished_prefix()
1500         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1501         w = StorageStatus(ss)
1502
1503         d = defer.Deferred()
1504
1505         def _check_1(ignored):
1506             # no ETA is available yet
1507             html = w.renderSynchronously()
1508             s = remove_tags(html)
1509             self.failUnlessIn("complete (next work", s)
1510
1511         def _check_2(ignored):
1512             # one prefix has finished, so an ETA based upon that elapsed time
1513             # should be available.
1514             html = w.renderSynchronously()
1515             s = remove_tags(html)
1516             self.failUnlessIn("complete (ETA ", s)
1517
1518         def _check_3(ignored):
1519             # two prefixes have finished
1520             html = w.renderSynchronously()
1521             s = remove_tags(html)
1522             self.failUnlessIn("complete (ETA ", s)
1523             d.callback("done")
1524
1525         hooks[0].addCallback(_check_1).addErrback(d.errback)
1526         hooks[1].addCallback(_check_2).addErrback(d.errback)
1527         hooks[2].addCallback(_check_3).addErrback(d.errback)
1528
1529         ss.setServiceParent(self.s)
1530         return d
1531
1532 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1533     stop_after_first_bucket = False
1534     def process_bucket(self, *args, **kwargs):
1535         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1536         if self.stop_after_first_bucket:
1537             self.stop_after_first_bucket = False
1538             self.cpu_slice = -1.0
1539     def yielding(self, sleep_time):
1540         if not self.stop_after_first_bucket:
1541             self.cpu_slice = 500
1542
1543 class BrokenStatResults:
1544     pass
1545 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1546     def stat(self, fn):
1547         s = os.stat(fn)
1548         bsr = BrokenStatResults()
1549         for attrname in dir(s):
1550             if attrname.startswith("_"):
1551                 continue
1552             if attrname == "st_blocks":
1553                 continue
1554             setattr(bsr, attrname, getattr(s, attrname))
1555         return bsr
1556
1557 class InstrumentedStorageServer(StorageServer):
1558     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1559 class No_ST_BLOCKS_StorageServer(StorageServer):
1560     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1561
1562 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1563
1564     def setUp(self):
1565         self.s = service.MultiService()
1566         self.s.startService()
1567     def tearDown(self):
1568         return self.s.stopService()
1569
1570     def make_shares(self, ss):
1571         def make(si):
1572             return (si, hashutil.tagged_hash("renew", si),
1573                     hashutil.tagged_hash("cancel", si))
1574         def make_mutable(si):
1575             return (si, hashutil.tagged_hash("renew", si),
1576                     hashutil.tagged_hash("cancel", si),
1577                     hashutil.tagged_hash("write-enabler", si))
1578         def make_extra_lease(si, num):
1579             return (hashutil.tagged_hash("renew-%d" % num, si),
1580                     hashutil.tagged_hash("cancel-%d" % num, si))
1581
1582         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1583         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1584         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1585         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1586         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1587         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1588         sharenums = [0]
1589         canary = FakeCanary()
1590         # note: 'tahoe debug dump-share' will not handle this file, since the
1591         # inner contents are not a valid CHK share
1592         data = "\xff" * 1000
1593
1594         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1595                                          1000, canary)
1596         w[0].remote_write(0, data)
1597         w[0].remote_close()
1598
1599         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1600                                          1000, canary)
1601         w[0].remote_write(0, data)
1602         w[0].remote_close()
1603         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1604
1605         writev = ss.remote_slot_testv_and_readv_and_writev
1606         writev(mutable_si_2, (we2, rs2, cs2),
1607                {0: ([], [(0,data)], len(data))}, [])
1608         writev(mutable_si_3, (we3, rs3, cs3),
1609                {0: ([], [(0,data)], len(data))}, [])
1610         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1611
1612         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1613         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1614         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1615
1616     def test_basic(self):
1617         basedir = "storage/LeaseCrawler/basic"
1618         fileutil.make_dirs(basedir)
1619         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1620         # make it start sooner than usual.
1621         lc = ss.lease_checker
1622         lc.slow_start = 0
1623         lc.cpu_slice = 500
1624         lc.stop_after_first_bucket = True
1625         webstatus = StorageStatus(ss)
1626
1627         # create a few shares, with some leases on them
1628         self.make_shares(ss)
1629         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1630
1631         # add a non-sharefile to exercise another code path
1632         fn = os.path.join(ss.sharedir,
1633                           storage_index_to_dir(immutable_si_0),
1634                           "not-a-share")
1635         f = open(fn, "wb")
1636         f.write("I am not a share.\n")
1637         f.close()
1638
1639         # this is before the crawl has started, so we're not in a cycle yet
1640         initial_state = lc.get_state()
1641         self.failIf(lc.get_progress()["cycle-in-progress"])
1642         self.failIfIn("cycle-to-date", initial_state)
1643         self.failIfIn("estimated-remaining-cycle", initial_state)
1644         self.failIfIn("estimated-current-cycle", initial_state)
1645         self.failUnlessIn("history", initial_state)
1646         self.failUnlessEqual(initial_state["history"], {})
1647
1648         ss.setServiceParent(self.s)
1649
1650         DAY = 24*60*60
1651
1652         d = fireEventually()
1653
1654         # now examine the state right after the first bucket has been
1655         # processed.
1656         def _after_first_bucket(ignored):
1657             initial_state = lc.get_state()
1658             if "cycle-to-date" not in initial_state:
1659                 d2 = fireEventually()
1660                 d2.addCallback(_after_first_bucket)
1661                 return d2
1662             self.failUnlessIn("cycle-to-date", initial_state)
1663             self.failUnlessIn("estimated-remaining-cycle", initial_state)
1664             self.failUnlessIn("estimated-current-cycle", initial_state)
1665             self.failUnlessIn("history", initial_state)
1666             self.failUnlessEqual(initial_state["history"], {})
1667
1668             so_far = initial_state["cycle-to-date"]
1669             self.failUnlessEqual(so_far["expiration-enabled"], False)
1670             self.failUnlessIn("configured-expiration-mode", so_far)
1671             self.failUnlessIn("lease-age-histogram", so_far)
1672             lah = so_far["lease-age-histogram"]
1673             self.failUnlessEqual(type(lah), list)
1674             self.failUnlessEqual(len(lah), 1)
1675             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1676             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1677             self.failUnlessEqual(so_far["corrupt-shares"], [])
1678             sr1 = so_far["space-recovered"]
1679             self.failUnlessEqual(sr1["examined-buckets"], 1)
1680             self.failUnlessEqual(sr1["examined-shares"], 1)
1681             self.failUnlessEqual(sr1["actual-shares"], 0)
1682             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1683             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1684             left = initial_state["estimated-remaining-cycle"]
1685             sr2 = left["space-recovered"]
1686             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1687             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1688             self.failIfEqual(sr2["actual-shares"], None)
1689             self.failIfEqual(sr2["configured-diskbytes"], None)
1690             self.failIfEqual(sr2["original-sharebytes"], None)
1691         d.addCallback(_after_first_bucket)
1692         d.addCallback(lambda ign: self.render1(webstatus))
1693         def _check_html_in_cycle(html):
1694             s = remove_tags(html)
1695             self.failUnlessIn("So far, this cycle has examined "
1696                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1697             self.failUnlessIn("and has recovered: "
1698                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1699                               "0 B (0 B / 0 B)", s)
1700             self.failUnlessIn("If expiration were enabled, "
1701                               "we would have recovered: "
1702                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1703                               " 0 B (0 B / 0 B) by now", s)
1704             self.failUnlessIn("and the remainder of this cycle "
1705                               "would probably recover: "
1706                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1707                               " 0 B (0 B / 0 B)", s)
1708             self.failUnlessIn("and the whole cycle would probably recover: "
1709                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1710                               " 0 B (0 B / 0 B)", s)
1711             self.failUnlessIn("if we were strictly using each lease's default "
1712                               "31-day lease lifetime", s)
1713             self.failUnlessIn("this cycle would be expected to recover: ", s)
1714         d.addCallback(_check_html_in_cycle)
1715
1716         # wait for the crawler to finish the first cycle. Nothing should have
1717         # been removed.
1718         def _wait():
1719             return bool(lc.get_state()["last-cycle-finished"] is not None)
1720         d.addCallback(lambda ign: self.poll(_wait))
1721
1722         def _after_first_cycle(ignored):
1723             s = lc.get_state()
1724             self.failIf("cycle-to-date" in s)
1725             self.failIf("estimated-remaining-cycle" in s)
1726             self.failIf("estimated-current-cycle" in s)
1727             last = s["history"][0]
1728             self.failUnlessIn("cycle-start-finish-times", last)
1729             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1730             self.failUnlessEqual(last["expiration-enabled"], False)
1731             self.failUnlessIn("configured-expiration-mode", last)
1732
1733             self.failUnlessIn("lease-age-histogram", last)
1734             lah = last["lease-age-histogram"]
1735             self.failUnlessEqual(type(lah), list)
1736             self.failUnlessEqual(len(lah), 1)
1737             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1738
1739             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1740             self.failUnlessEqual(last["corrupt-shares"], [])
1741
1742             rec = last["space-recovered"]
1743             self.failUnlessEqual(rec["examined-buckets"], 4)
1744             self.failUnlessEqual(rec["examined-shares"], 4)
1745             self.failUnlessEqual(rec["actual-buckets"], 0)
1746             self.failUnlessEqual(rec["original-buckets"], 0)
1747             self.failUnlessEqual(rec["configured-buckets"], 0)
1748             self.failUnlessEqual(rec["actual-shares"], 0)
1749             self.failUnlessEqual(rec["original-shares"], 0)
1750             self.failUnlessEqual(rec["configured-shares"], 0)
1751             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1752             self.failUnlessEqual(rec["original-diskbytes"], 0)
1753             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1754             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1755             self.failUnlessEqual(rec["original-sharebytes"], 0)
1756             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1757
1758             def _get_sharefile(si):
1759                 return list(ss._iter_share_files(si))[0]
1760             def count_leases(si):
1761                 return len(list(_get_sharefile(si).get_leases()))
1762             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1763             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1764             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1765             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1766         d.addCallback(_after_first_cycle)
1767         d.addCallback(lambda ign: self.render1(webstatus))
1768         def _check_html(html):
1769             s = remove_tags(html)
1770             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1771                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1772             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1773                               "(2 mutable / 2 immutable),", s)
1774             self.failUnlessIn("but expiration was not enabled", s)
1775         d.addCallback(_check_html)
1776         d.addCallback(lambda ign: self.render_json(webstatus))
1777         def _check_json(json):
1778             data = simplejson.loads(json)
1779             self.failUnlessIn("lease-checker", data)
1780             self.failUnlessIn("lease-checker-progress", data)
1781         d.addCallback(_check_json)
1782         return d
1783
1784     def backdate_lease(self, sf, renew_secret, new_expire_time):
1785         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1786         # "renew" a lease with a new_expire_time that is older than what the
1787         # current lease has), so we have to reach inside it.
1788         for i,lease in enumerate(sf.get_leases()):
1789             if lease.renew_secret == renew_secret:
1790                 lease.expiration_time = new_expire_time
1791                 f = open(sf.home, 'rb+')
1792                 sf._write_lease_record(f, i, lease)
1793                 f.close()
1794                 return
1795         raise IndexError("unable to renew non-existent lease")
1796
1797     def test_expire_age(self):
1798         basedir = "storage/LeaseCrawler/expire_age"
1799         fileutil.make_dirs(basedir)
1800         # setting expiration_time to 2000 means that any lease which is more
1801         # than 2000s old will be expired.
1802         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1803                                        expiration_enabled=True,
1804                                        expiration_mode="age",
1805                                        expiration_override_lease_duration=2000)
1806         # make it start sooner than usual.
1807         lc = ss.lease_checker
1808         lc.slow_start = 0
1809         lc.stop_after_first_bucket = True
1810         webstatus = StorageStatus(ss)
1811
1812         # create a few shares, with some leases on them
1813         self.make_shares(ss)
1814         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1815
1816         def count_shares(si):
1817             return len(list(ss._iter_share_files(si)))
1818         def _get_sharefile(si):
1819             return list(ss._iter_share_files(si))[0]
1820         def count_leases(si):
1821             return len(list(_get_sharefile(si).get_leases()))
1822
1823         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1824         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1825         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1826         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1827         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1828         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1829         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1830         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1831
1832         # artificially crank back the expiration time on the first lease of
1833         # each share, to make it look like it expired already (age=1000s).
1834         # Some shares have an extra lease which is set to expire at the
1835         # default time in 31 days from now (age=31days). We then run the
1836         # crawler, which will expire the first lease, making some shares get
1837         # deleted and others stay alive (with one remaining lease)
1838         now = time.time()
1839
1840         sf0 = _get_sharefile(immutable_si_0)
1841         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1842         sf0_size = os.stat(sf0.home).st_size
1843
1844         # immutable_si_1 gets an extra lease
1845         sf1 = _get_sharefile(immutable_si_1)
1846         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1847
1848         sf2 = _get_sharefile(mutable_si_2)
1849         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1850         sf2_size = os.stat(sf2.home).st_size
1851
1852         # mutable_si_3 gets an extra lease
1853         sf3 = _get_sharefile(mutable_si_3)
1854         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1855
1856         ss.setServiceParent(self.s)
1857
1858         d = fireEventually()
1859         # examine the state right after the first bucket has been processed
1860         def _after_first_bucket(ignored):
1861             p = lc.get_progress()
1862             if not p["cycle-in-progress"]:
1863                 d2 = fireEventually()
1864                 d2.addCallback(_after_first_bucket)
1865                 return d2
1866         d.addCallback(_after_first_bucket)
1867         d.addCallback(lambda ign: self.render1(webstatus))
1868         def _check_html_in_cycle(html):
1869             s = remove_tags(html)
1870             # the first bucket encountered gets deleted, and its prefix
1871             # happens to be about 1/5th of the way through the ring, so the
1872             # predictor thinks we'll have 5 shares and that we'll delete them
1873             # all. This part of the test depends upon the SIs landing right
1874             # where they do now.
1875             self.failUnlessIn("The remainder of this cycle is expected to "
1876                               "recover: 4 shares, 4 buckets", s)
1877             self.failUnlessIn("The whole cycle is expected to examine "
1878                               "5 shares in 5 buckets and to recover: "
1879                               "5 shares, 5 buckets", s)
1880         d.addCallback(_check_html_in_cycle)
1881
1882         # wait for the crawler to finish the first cycle. Two shares should
1883         # have been removed
1884         def _wait():
1885             return bool(lc.get_state()["last-cycle-finished"] is not None)
1886         d.addCallback(lambda ign: self.poll(_wait))
1887
1888         def _after_first_cycle(ignored):
1889             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1890             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1891             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1892             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1893             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1894             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1895
1896             s = lc.get_state()
1897             last = s["history"][0]
1898
1899             self.failUnlessEqual(last["expiration-enabled"], True)
1900             self.failUnlessEqual(last["configured-expiration-mode"],
1901                                  ("age", 2000, None, ("mutable", "immutable")))
1902             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1903
1904             rec = last["space-recovered"]
1905             self.failUnlessEqual(rec["examined-buckets"], 4)
1906             self.failUnlessEqual(rec["examined-shares"], 4)
1907             self.failUnlessEqual(rec["actual-buckets"], 2)
1908             self.failUnlessEqual(rec["original-buckets"], 2)
1909             self.failUnlessEqual(rec["configured-buckets"], 2)
1910             self.failUnlessEqual(rec["actual-shares"], 2)
1911             self.failUnlessEqual(rec["original-shares"], 2)
1912             self.failUnlessEqual(rec["configured-shares"], 2)
1913             size = sf0_size + sf2_size
1914             self.failUnlessEqual(rec["actual-sharebytes"], size)
1915             self.failUnlessEqual(rec["original-sharebytes"], size)
1916             self.failUnlessEqual(rec["configured-sharebytes"], size)
1917             # different platforms have different notions of "blocks used by
1918             # this file", so merely assert that it's a number
1919             self.failUnless(rec["actual-diskbytes"] >= 0,
1920                             rec["actual-diskbytes"])
1921             self.failUnless(rec["original-diskbytes"] >= 0,
1922                             rec["original-diskbytes"])
1923             self.failUnless(rec["configured-diskbytes"] >= 0,
1924                             rec["configured-diskbytes"])
1925         d.addCallback(_after_first_cycle)
1926         d.addCallback(lambda ign: self.render1(webstatus))
1927         def _check_html(html):
1928             s = remove_tags(html)
1929             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1930             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1931             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1932         d.addCallback(_check_html)
1933         return d
1934
1935     def test_expire_cutoff_date(self):
1936         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1937         fileutil.make_dirs(basedir)
1938         # setting cutoff-date to 2000 seconds ago means that any lease which
1939         # is more than 2000s old will be expired.
1940         now = time.time()
1941         then = int(now - 2000)
1942         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1943                                        expiration_enabled=True,
1944                                        expiration_mode="cutoff-date",
1945                                        expiration_cutoff_date=then)
1946         # make it start sooner than usual.
1947         lc = ss.lease_checker
1948         lc.slow_start = 0
1949         lc.stop_after_first_bucket = True
1950         webstatus = StorageStatus(ss)
1951
1952         # create a few shares, with some leases on them
1953         self.make_shares(ss)
1954         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1955
1956         def count_shares(si):
1957             return len(list(ss._iter_share_files(si)))
1958         def _get_sharefile(si):
1959             return list(ss._iter_share_files(si))[0]
1960         def count_leases(si):
1961             return len(list(_get_sharefile(si).get_leases()))
1962
1963         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1964         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1965         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1966         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1967         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1968         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1969         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1970         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1971
1972         # artificially crank back the expiration time on the first lease of
1973         # each share, to make it look like was renewed 3000s ago. To achieve
1974         # this, we need to set the expiration time to now-3000+31days. This
1975         # will change when the lease format is improved to contain both
1976         # create/renew time and duration.
1977         new_expiration_time = now - 3000 + 31*24*60*60
1978
1979         # Some shares have an extra lease which is set to expire at the
1980         # default time in 31 days from now (age=31days). We then run the
1981         # crawler, which will expire the first lease, making some shares get
1982         # deleted and others stay alive (with one remaining lease)
1983
1984         sf0 = _get_sharefile(immutable_si_0)
1985         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1986         sf0_size = os.stat(sf0.home).st_size
1987
1988         # immutable_si_1 gets an extra lease
1989         sf1 = _get_sharefile(immutable_si_1)
1990         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1991
1992         sf2 = _get_sharefile(mutable_si_2)
1993         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1994         sf2_size = os.stat(sf2.home).st_size
1995
1996         # mutable_si_3 gets an extra lease
1997         sf3 = _get_sharefile(mutable_si_3)
1998         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1999
2000         ss.setServiceParent(self.s)
2001
2002         d = fireEventually()
2003         # examine the state right after the first bucket has been processed
2004         def _after_first_bucket(ignored):
2005             p = lc.get_progress()
2006             if not p["cycle-in-progress"]:
2007                 d2 = fireEventually()
2008                 d2.addCallback(_after_first_bucket)
2009                 return d2
2010         d.addCallback(_after_first_bucket)
2011         d.addCallback(lambda ign: self.render1(webstatus))
2012         def _check_html_in_cycle(html):
2013             s = remove_tags(html)
2014             # the first bucket encountered gets deleted, and its prefix
2015             # happens to be about 1/5th of the way through the ring, so the
2016             # predictor thinks we'll have 5 shares and that we'll delete them
2017             # all. This part of the test depends upon the SIs landing right
2018             # where they do now.
2019             self.failUnlessIn("The remainder of this cycle is expected to "
2020                               "recover: 4 shares, 4 buckets", s)
2021             self.failUnlessIn("The whole cycle is expected to examine "
2022                               "5 shares in 5 buckets and to recover: "
2023                               "5 shares, 5 buckets", s)
2024         d.addCallback(_check_html_in_cycle)
2025
2026         # wait for the crawler to finish the first cycle. Two shares should
2027         # have been removed
2028         def _wait():
2029             return bool(lc.get_state()["last-cycle-finished"] is not None)
2030         d.addCallback(lambda ign: self.poll(_wait))
2031
2032         def _after_first_cycle(ignored):
2033             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2034             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2035             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2036             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2037             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2038             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2039
2040             s = lc.get_state()
2041             last = s["history"][0]
2042
2043             self.failUnlessEqual(last["expiration-enabled"], True)
2044             self.failUnlessEqual(last["configured-expiration-mode"],
2045                                  ("cutoff-date", None, then,
2046                                   ("mutable", "immutable")))
2047             self.failUnlessEqual(last["leases-per-share-histogram"],
2048                                  {1: 2, 2: 2})
2049
2050             rec = last["space-recovered"]
2051             self.failUnlessEqual(rec["examined-buckets"], 4)
2052             self.failUnlessEqual(rec["examined-shares"], 4)
2053             self.failUnlessEqual(rec["actual-buckets"], 2)
2054             self.failUnlessEqual(rec["original-buckets"], 0)
2055             self.failUnlessEqual(rec["configured-buckets"], 2)
2056             self.failUnlessEqual(rec["actual-shares"], 2)
2057             self.failUnlessEqual(rec["original-shares"], 0)
2058             self.failUnlessEqual(rec["configured-shares"], 2)
2059             size = sf0_size + sf2_size
2060             self.failUnlessEqual(rec["actual-sharebytes"], size)
2061             self.failUnlessEqual(rec["original-sharebytes"], 0)
2062             self.failUnlessEqual(rec["configured-sharebytes"], size)
2063             # different platforms have different notions of "blocks used by
2064             # this file", so merely assert that it's a number
2065             self.failUnless(rec["actual-diskbytes"] >= 0,
2066                             rec["actual-diskbytes"])
2067             self.failUnless(rec["original-diskbytes"] >= 0,
2068                             rec["original-diskbytes"])
2069             self.failUnless(rec["configured-diskbytes"] >= 0,
2070                             rec["configured-diskbytes"])
2071         d.addCallback(_after_first_cycle)
2072         d.addCallback(lambda ign: self.render1(webstatus))
2073         def _check_html(html):
2074             s = remove_tags(html)
2075             self.failUnlessIn("Expiration Enabled:"
2076                               " expired leases will be removed", s)
2077             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2078             substr = "Leases created or last renewed before %s will be considered expired." % date
2079             self.failUnlessIn(substr, s)
2080             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2081         d.addCallback(_check_html)
2082         return d
2083
2084     def test_only_immutable(self):
2085         basedir = "storage/LeaseCrawler/only_immutable"
2086         fileutil.make_dirs(basedir)
2087         now = time.time()
2088         then = int(now - 2000)
2089         ss = StorageServer(basedir, "\x00" * 20,
2090                            expiration_enabled=True,
2091                            expiration_mode="cutoff-date",
2092                            expiration_cutoff_date=then,
2093                            expiration_sharetypes=("immutable",))
2094         lc = ss.lease_checker
2095         lc.slow_start = 0
2096         webstatus = StorageStatus(ss)
2097
2098         self.make_shares(ss)
2099         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2100         # set all leases to be expirable
2101         new_expiration_time = now - 3000 + 31*24*60*60
2102
2103         def count_shares(si):
2104             return len(list(ss._iter_share_files(si)))
2105         def _get_sharefile(si):
2106             return list(ss._iter_share_files(si))[0]
2107         def count_leases(si):
2108             return len(list(_get_sharefile(si).get_leases()))
2109
2110         sf0 = _get_sharefile(immutable_si_0)
2111         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2112         sf1 = _get_sharefile(immutable_si_1)
2113         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2114         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2115         sf2 = _get_sharefile(mutable_si_2)
2116         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2117         sf3 = _get_sharefile(mutable_si_3)
2118         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2119         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2120
2121         ss.setServiceParent(self.s)
2122         def _wait():
2123             return bool(lc.get_state()["last-cycle-finished"] is not None)
2124         d = self.poll(_wait)
2125
2126         def _after_first_cycle(ignored):
2127             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2128             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2129             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2130             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2131             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2132             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2133         d.addCallback(_after_first_cycle)
2134         d.addCallback(lambda ign: self.render1(webstatus))
2135         def _check_html(html):
2136             s = remove_tags(html)
2137             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2138         d.addCallback(_check_html)
2139         return d
2140
2141     def test_only_mutable(self):
2142         basedir = "storage/LeaseCrawler/only_mutable"
2143         fileutil.make_dirs(basedir)
2144         now = time.time()
2145         then = int(now - 2000)
2146         ss = StorageServer(basedir, "\x00" * 20,
2147                            expiration_enabled=True,
2148                            expiration_mode="cutoff-date",
2149                            expiration_cutoff_date=then,
2150                            expiration_sharetypes=("mutable",))
2151         lc = ss.lease_checker
2152         lc.slow_start = 0
2153         webstatus = StorageStatus(ss)
2154
2155         self.make_shares(ss)
2156         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2157         # set all leases to be expirable
2158         new_expiration_time = now - 3000 + 31*24*60*60
2159
2160         def count_shares(si):
2161             return len(list(ss._iter_share_files(si)))
2162         def _get_sharefile(si):
2163             return list(ss._iter_share_files(si))[0]
2164         def count_leases(si):
2165             return len(list(_get_sharefile(si).get_leases()))
2166
2167         sf0 = _get_sharefile(immutable_si_0)
2168         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2169         sf1 = _get_sharefile(immutable_si_1)
2170         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2171         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2172         sf2 = _get_sharefile(mutable_si_2)
2173         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2174         sf3 = _get_sharefile(mutable_si_3)
2175         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2176         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2177
2178         ss.setServiceParent(self.s)
2179         def _wait():
2180             return bool(lc.get_state()["last-cycle-finished"] is not None)
2181         d = self.poll(_wait)
2182
2183         def _after_first_cycle(ignored):
2184             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2185             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2186             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2187             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2188             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2189             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2190         d.addCallback(_after_first_cycle)
2191         d.addCallback(lambda ign: self.render1(webstatus))
2192         def _check_html(html):
2193             s = remove_tags(html)
2194             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2195         d.addCallback(_check_html)
2196         return d
2197
2198     def test_bad_mode(self):
2199         basedir = "storage/LeaseCrawler/bad_mode"
2200         fileutil.make_dirs(basedir)
2201         e = self.failUnlessRaises(ValueError,
2202                                   StorageServer, basedir, "\x00" * 20,
2203                                   expiration_mode="bogus")
2204         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2205
2206     def test_parse_duration(self):
2207         DAY = 24*60*60
2208         MONTH = 31*DAY
2209         YEAR = 365*DAY
2210         p = time_format.parse_duration
2211         self.failUnlessEqual(p("7days"), 7*DAY)
2212         self.failUnlessEqual(p("31day"), 31*DAY)
2213         self.failUnlessEqual(p("60 days"), 60*DAY)
2214         self.failUnlessEqual(p("2mo"), 2*MONTH)
2215         self.failUnlessEqual(p("3 month"), 3*MONTH)
2216         self.failUnlessEqual(p("2years"), 2*YEAR)
2217         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2218         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2219
2220     def test_parse_date(self):
2221         p = time_format.parse_date
2222         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2223         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2224
2225     def test_limited_history(self):
2226         basedir = "storage/LeaseCrawler/limited_history"
2227         fileutil.make_dirs(basedir)
2228         ss = StorageServer(basedir, "\x00" * 20)
2229         # make it start sooner than usual.
2230         lc = ss.lease_checker
2231         lc.slow_start = 0
2232         lc.cpu_slice = 500
2233
2234         # create a few shares, with some leases on them
2235         self.make_shares(ss)
2236
2237         ss.setServiceParent(self.s)
2238
2239         def _wait_until_15_cycles_done():
2240             last = lc.state["last-cycle-finished"]
2241             if last is not None and last >= 15:
2242                 return True
2243             if lc.timer:
2244                 lc.timer.reset(0)
2245             return False
2246         d = self.poll(_wait_until_15_cycles_done)
2247
2248         def _check(ignored):
2249             s = lc.get_state()
2250             h = s["history"]
2251             self.failUnlessEqual(len(h), 10)
2252             self.failUnlessEqual(max(h.keys()), 15)
2253             self.failUnlessEqual(min(h.keys()), 6)
2254         d.addCallback(_check)
2255         return d
2256
2257     def test_unpredictable_future(self):
2258         basedir = "storage/LeaseCrawler/unpredictable_future"
2259         fileutil.make_dirs(basedir)
2260         ss = StorageServer(basedir, "\x00" * 20)
2261         # make it start sooner than usual.
2262         lc = ss.lease_checker
2263         lc.slow_start = 0
2264         lc.cpu_slice = -1.0 # stop quickly
2265
2266         self.make_shares(ss)
2267
2268         ss.setServiceParent(self.s)
2269
2270         d = fireEventually()
2271         def _check(ignored):
2272             # this should fire after the first bucket is complete, but before
2273             # the first prefix is complete, so the progress-measurer won't
2274             # think we've gotten far enough to raise our percent-complete
2275             # above 0%, triggering the cannot-predict-the-future code in
2276             # expirer.py . This will have to change if/when the
2277             # progress-measurer gets smart enough to count buckets (we'll
2278             # have to interrupt it even earlier, before it's finished the
2279             # first bucket).
2280             s = lc.get_state()
2281             if "cycle-to-date" not in s:
2282                 d2 = fireEventually()
2283                 d2.addCallback(_check)
2284                 return d2
2285             self.failUnlessIn("cycle-to-date", s)
2286             self.failUnlessIn("estimated-remaining-cycle", s)
2287             self.failUnlessIn("estimated-current-cycle", s)
2288
2289             left = s["estimated-remaining-cycle"]["space-recovered"]
2290             self.failUnlessEqual(left["actual-buckets"], None)
2291             self.failUnlessEqual(left["original-buckets"], None)
2292             self.failUnlessEqual(left["configured-buckets"], None)
2293             self.failUnlessEqual(left["actual-shares"], None)
2294             self.failUnlessEqual(left["original-shares"], None)
2295             self.failUnlessEqual(left["configured-shares"], None)
2296             self.failUnlessEqual(left["actual-diskbytes"], None)
2297             self.failUnlessEqual(left["original-diskbytes"], None)
2298             self.failUnlessEqual(left["configured-diskbytes"], None)
2299             self.failUnlessEqual(left["actual-sharebytes"], None)
2300             self.failUnlessEqual(left["original-sharebytes"], None)
2301             self.failUnlessEqual(left["configured-sharebytes"], None)
2302
2303             full = s["estimated-remaining-cycle"]["space-recovered"]
2304             self.failUnlessEqual(full["actual-buckets"], None)
2305             self.failUnlessEqual(full["original-buckets"], None)
2306             self.failUnlessEqual(full["configured-buckets"], None)
2307             self.failUnlessEqual(full["actual-shares"], None)
2308             self.failUnlessEqual(full["original-shares"], None)
2309             self.failUnlessEqual(full["configured-shares"], None)
2310             self.failUnlessEqual(full["actual-diskbytes"], None)
2311             self.failUnlessEqual(full["original-diskbytes"], None)
2312             self.failUnlessEqual(full["configured-diskbytes"], None)
2313             self.failUnlessEqual(full["actual-sharebytes"], None)
2314             self.failUnlessEqual(full["original-sharebytes"], None)
2315             self.failUnlessEqual(full["configured-sharebytes"], None)
2316
2317         d.addCallback(_check)
2318         return d
2319
2320     def test_no_st_blocks(self):
2321         basedir = "storage/LeaseCrawler/no_st_blocks"
2322         fileutil.make_dirs(basedir)
2323         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2324                                         expiration_mode="age",
2325                                         expiration_override_lease_duration=-1000)
2326         # a negative expiration_time= means the "configured-"
2327         # space-recovered counts will be non-zero, since all shares will have
2328         # expired by then
2329
2330         # make it start sooner than usual.
2331         lc = ss.lease_checker
2332         lc.slow_start = 0
2333
2334         self.make_shares(ss)
2335         ss.setServiceParent(self.s)
2336         def _wait():
2337             return bool(lc.get_state()["last-cycle-finished"] is not None)
2338         d = self.poll(_wait)
2339
2340         def _check(ignored):
2341             s = lc.get_state()
2342             last = s["history"][0]
2343             rec = last["space-recovered"]
2344             self.failUnlessEqual(rec["configured-buckets"], 4)
2345             self.failUnlessEqual(rec["configured-shares"], 4)
2346             self.failUnless(rec["configured-sharebytes"] > 0,
2347                             rec["configured-sharebytes"])
2348             # without the .st_blocks field in os.stat() results, we should be
2349             # reporting diskbytes==sharebytes
2350             self.failUnlessEqual(rec["configured-sharebytes"],
2351                                  rec["configured-diskbytes"])
2352         d.addCallback(_check)
2353         return d
2354
2355     def test_share_corruption(self):
2356         self._poll_should_ignore_these_errors = [
2357             UnknownMutableContainerVersionError,
2358             UnknownImmutableContainerVersionError,
2359             ]
2360         basedir = "storage/LeaseCrawler/share_corruption"
2361         fileutil.make_dirs(basedir)
2362         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2363         w = StorageStatus(ss)
2364         # make it start sooner than usual.
2365         lc = ss.lease_checker
2366         lc.stop_after_first_bucket = True
2367         lc.slow_start = 0
2368         lc.cpu_slice = 500
2369
2370         # create a few shares, with some leases on them
2371         self.make_shares(ss)
2372
2373         # now corrupt one, and make sure the lease-checker keeps going
2374         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2375         first = min(self.sis)
2376         first_b32 = base32.b2a(first)
2377         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2378         f = open(fn, "rb+")
2379         f.seek(0)
2380         f.write("BAD MAGIC")
2381         f.close()
2382         # if get_share_file() doesn't see the correct mutable magic, it
2383         # assumes the file is an immutable share, and then
2384         # immutable.ShareFile sees a bad version. So regardless of which kind
2385         # of share we corrupted, this will trigger an
2386         # UnknownImmutableContainerVersionError.
2387
2388         # also create an empty bucket
2389         empty_si = base32.b2a("\x04"*16)
2390         empty_bucket_dir = os.path.join(ss.sharedir,
2391                                         storage_index_to_dir(empty_si))
2392         fileutil.make_dirs(empty_bucket_dir)
2393
2394         ss.setServiceParent(self.s)
2395
2396         d = fireEventually()
2397
2398         # now examine the state right after the first bucket has been
2399         # processed.
2400         def _after_first_bucket(ignored):
2401             s = lc.get_state()
2402             if "cycle-to-date" not in s:
2403                 d2 = fireEventually()
2404                 d2.addCallback(_after_first_bucket)
2405                 return d2
2406             so_far = s["cycle-to-date"]
2407             rec = so_far["space-recovered"]
2408             self.failUnlessEqual(rec["examined-buckets"], 1)
2409             self.failUnlessEqual(rec["examined-shares"], 0)
2410             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2411         d.addCallback(_after_first_bucket)
2412
2413         d.addCallback(lambda ign: self.render_json(w))
2414         def _check_json(json):
2415             data = simplejson.loads(json)
2416             # grr. json turns all dict keys into strings.
2417             so_far = data["lease-checker"]["cycle-to-date"]
2418             corrupt_shares = so_far["corrupt-shares"]
2419             # it also turns all tuples into lists
2420             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2421         d.addCallback(_check_json)
2422         d.addCallback(lambda ign: self.render1(w))
2423         def _check_html(html):
2424             s = remove_tags(html)
2425             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2426         d.addCallback(_check_html)
2427
2428         def _wait():
2429             return bool(lc.get_state()["last-cycle-finished"] is not None)
2430         d.addCallback(lambda ign: self.poll(_wait))
2431
2432         def _after_first_cycle(ignored):
2433             s = lc.get_state()
2434             last = s["history"][0]
2435             rec = last["space-recovered"]
2436             self.failUnlessEqual(rec["examined-buckets"], 5)
2437             self.failUnlessEqual(rec["examined-shares"], 3)
2438             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2439         d.addCallback(_after_first_cycle)
2440         d.addCallback(lambda ign: self.render_json(w))
2441         def _check_json_history(json):
2442             data = simplejson.loads(json)
2443             last = data["lease-checker"]["history"]["0"]
2444             corrupt_shares = last["corrupt-shares"]
2445             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2446         d.addCallback(_check_json_history)
2447         d.addCallback(lambda ign: self.render1(w))
2448         def _check_html_history(html):
2449             s = remove_tags(html)
2450             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2451         d.addCallback(_check_html_history)
2452
2453         def _cleanup(res):
2454             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2455                                    UnknownImmutableContainerVersionError)
2456             return res
2457         d.addBoth(_cleanup)
2458         return d
2459
2460     def render_json(self, page):
2461         d = self.render1(page, args={"t": ["json"]})
2462         return d
2463
2464 class NoDiskStatsServer(StorageServer):
2465     def get_disk_stats(self):
2466         raise AttributeError
2467
2468 class BadDiskStatsServer(StorageServer):
2469     def get_disk_stats(self):
2470         raise OSError
2471
2472 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2473
2474     def setUp(self):
2475         self.s = service.MultiService()
2476         self.s.startService()
2477     def tearDown(self):
2478         return self.s.stopService()
2479
2480     def test_no_server(self):
2481         w = StorageStatus(None)
2482         html = w.renderSynchronously()
2483         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2484
2485     def test_status(self):
2486         basedir = "storage/WebStatus/status"
2487         fileutil.make_dirs(basedir)
2488         ss = StorageServer(basedir, "\x00" * 20)
2489         ss.setServiceParent(self.s)
2490         w = StorageStatus(ss)
2491         d = self.render1(w)
2492         def _check_html(html):
2493             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2494             s = remove_tags(html)
2495             self.failUnlessIn("Accepting new shares: Yes", s)
2496             self.failUnlessIn("Reserved space: - 0 B (0)", s)
2497         d.addCallback(_check_html)
2498         d.addCallback(lambda ign: self.render_json(w))
2499         def _check_json(json):
2500             data = simplejson.loads(json)
2501             s = data["stats"]
2502             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2503             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2504             self.failUnlessIn("bucket-counter", data)
2505             self.failUnlessIn("lease-checker", data)
2506         d.addCallback(_check_json)
2507         return d
2508
2509     def render_json(self, page):
2510         d = self.render1(page, args={"t": ["json"]})
2511         return d
2512
2513     def test_status_no_disk_stats(self):
2514         # Some platforms may have no disk stats API. Make sure the code can handle that
2515         # (test runs on all platforms).
2516         basedir = "storage/WebStatus/status_no_disk_stats"
2517         fileutil.make_dirs(basedir)
2518         ss = NoDiskStatsServer(basedir, "\x00" * 20)
2519         ss.setServiceParent(self.s)
2520         w = StorageStatus(ss)
2521         html = w.renderSynchronously()
2522         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2523         s = remove_tags(html)
2524         self.failUnlessIn("Accepting new shares: Yes", s)
2525         self.failUnlessIn("Total disk space: ?", s)
2526         self.failUnlessIn("Space Available to Tahoe: ?", s)
2527         self.failUnless(ss.get_available_space() is None)
2528
2529     def test_status_bad_disk_stats(self):
2530         # If the API to get disk stats exists but a call to it fails, then the status should
2531         # show that no shares will be accepted, and get_available_space() should be 0.
2532         basedir = "storage/WebStatus/status_bad_disk_stats"
2533         fileutil.make_dirs(basedir)
2534         ss = BadDiskStatsServer(basedir, "\x00" * 20)
2535         ss.setServiceParent(self.s)
2536         w = StorageStatus(ss)
2537         html = w.renderSynchronously()
2538         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2539         s = remove_tags(html)
2540         self.failUnlessIn("Accepting new shares: No", s)
2541         self.failUnlessIn("Total disk space: ?", s)
2542         self.failUnlessIn("Space Available to Tahoe: ?", s)
2543         self.failUnlessEqual(ss.get_available_space(), 0)
2544
2545     def test_readonly(self):
2546         basedir = "storage/WebStatus/readonly"
2547         fileutil.make_dirs(basedir)
2548         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2549         ss.setServiceParent(self.s)
2550         w = StorageStatus(ss)
2551         html = w.renderSynchronously()
2552         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2553         s = remove_tags(html)
2554         self.failUnlessIn("Accepting new shares: No", s)
2555
2556     def test_reserved(self):
2557         basedir = "storage/WebStatus/reserved"
2558         fileutil.make_dirs(basedir)
2559         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2560         ss.setServiceParent(self.s)
2561         w = StorageStatus(ss)
2562         html = w.renderSynchronously()
2563         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2564         s = remove_tags(html)
2565         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2566
2567     def test_huge_reserved(self):
2568         basedir = "storage/WebStatus/reserved"
2569         fileutil.make_dirs(basedir)
2570         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2571         ss.setServiceParent(self.s)
2572         w = StorageStatus(ss)
2573         html = w.renderSynchronously()
2574         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2575         s = remove_tags(html)
2576         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2577
2578     def test_util(self):
2579         w = StorageStatus(None)
2580         self.failUnlessEqual(w.render_space(None, None), "?")
2581         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2582         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2583         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2584         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2585         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
2586